Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP][ISSUE #199] Support Kafka connector plugin and Kafka as event store #199 #376

Closed

Conversation

ruanwenjun
Copy link
Member

close #199

@qqeasonchen
Copy link
Contributor

@ruanwenjun good job!

@ruanwenjun ruanwenjun force-pushed the dev_wenjun_1.3.0kafka branch 4 times, most recently from 4450bbf to 39dda47 Compare June 8, 2021 11:25
@xwm1992
Copy link
Contributor

xwm1992 commented Jun 10, 2021

hi, @ruanwenjun I try to use pub/sub message with kafka connector, but I can't use this kafka plugin. Here are some problem below.
1.This pr code version behind latest code version, try to pull the latest code may be better. such as some packages name still contains webank in eventmesh-common module and eventmesh-api module.
2.when I try to use kafka plugin to pub/sub messages, the problem is:
image
jdk spi can't load this plugin, please help take a look, I'll continue to follow up related questions.

@xwm1992
Copy link
Contributor

xwm1992 commented Jun 10, 2021

OMSConsumerAdaptor and OMSProducerAdaptor these two adaptors implementations need constructor without parameters for jdk spi requirements, moving the constructors configuration to init method may be better. Except for the problem above, here also have problem below, please take a look ,thanks @ruanwenjun .
image

@ruanwenjun
Copy link
Member Author

ruanwenjun commented Jun 13, 2021

@xwm1992 Hi, I have a question. Currently, the branch 1.3.0 is behind the development very much, can I checkout directly from develop branch and push to 1.3.0? Thanks.

@xwm1992
Copy link
Contributor

xwm1992 commented Jun 14, 2021

@xwm1992 Hi, I have a question. Currently, the branch 1.3.0 is behind the development very much, can I checkout directly from develop branch and push to 1.3.0? Thanks.

It's ok, we will update the code to the latest version under the 1.3.0 branch, you can pull the latest code later and solve the conflicts, thanks

@ruanwenjun ruanwenjun force-pushed the dev_wenjun_1.3.0kafka branch 2 times, most recently from c76d8d8 to d3edf7a Compare July 1, 2021 12:12
@ruanwenjun ruanwenjun changed the title [ISSUE #199] Support Kafka connector plugin and Kafka as event store #199 [WIP][ISSUE #199] Support Kafka connector plugin and Kafka as event store #199 Jul 1, 2021
@ruanwenjun ruanwenjun changed the title [WIP][ISSUE #199] Support Kafka connector plugin and Kafka as event store #199 [ISSUE #199] Support Kafka connector plugin and Kafka as event store #199 Jul 2, 2021
@ruanwenjun
Copy link
Member Author

@xwm1992 Please help me test again, thanks a lot.

@codecov-commenter
Copy link

codecov-commenter commented Jul 4, 2021

Codecov Report

Merging #376 (8955c49) into develop (737c0f0) will increase coverage by 0.23%.
The diff coverage is 16.71%.

❗ Current head 8955c49 differs from pull request most recent head 14262af. Consider uploading reports for the commit 14262af to get more accurate results
Impacted file tree graph

@@              Coverage Diff              @@
##             develop     #376      +/-   ##
=============================================
+ Coverage       9.84%   10.07%   +0.23%     
- Complexity       283      310      +27     
=============================================
  Files            228      239      +11     
  Lines          10829    11212     +383     
  Branches         923      969      +46     
=============================================
+ Hits            1066     1130      +64     
- Misses          9666     9965     +299     
- Partials          97      117      +20     
Impacted Files Coverage Δ
...mesh/connector/kafka/MessagingAccessPointImpl.java 0.00% <0.00%> (ø)
...he/eventmesh/connector/kafka/common/Constants.java 0.00% <0.00%> (ø)
...onnector/kafka/common/OpenMessageDeserializer.java 0.00% <0.00%> (ø)
.../connector/kafka/consumer/KafkaMQConsumerImpl.java 0.00% <0.00%> (ø)
...ector/kafka/consumer/KafkaMeshConsumerAdaptor.java 0.00% <0.00%> (ø)
.../connector/kafka/producer/KafkaMQProducerImpl.java 0.00% <0.00%> (ø)
...ector/kafka/producer/KafkaMeshProducerAdaptor.java 5.55% <5.55%> (ø)
.../connector/kafka/consumer/KafkaConsumerConfig.java 35.13% <35.13%> (ø)
.../connector/kafka/common/OpenMessageSerializer.java 42.85% <42.85%> (ø)
.../connector/kafka/producer/KafkaProducerConfig.java 48.38% <48.38%> (ø)
... and 12 more

Continue to review full report at Codecov.

Legend - Click here to learn more
Δ = absolute <relative> (impact), ø = not affected, ? = missing data
Powered by Codecov. Last update 737c0f0...14262af. Read the comment docs.

@ruanwenjun ruanwenjun force-pushed the dev_wenjun_1.3.0kafka branch 2 times, most recently from f6bfaa6 to 227e357 Compare July 9, 2021 13:52
@ruanwenjun ruanwenjun changed the base branch from 1.3.0 to develop July 12, 2021 14:10
@ruanwenjun ruanwenjun force-pushed the dev_wenjun_1.3.0kafka branch 3 times, most recently from 64a68f3 to cd594b5 Compare July 12, 2021 15:51
@ruanwenjun ruanwenjun force-pushed the dev_wenjun_1.3.0kafka branch 2 times, most recently from b24d38b to eec5448 Compare July 21, 2021 11:57
@Jackzeng1224
Copy link

@ruanwenjun hello,I can’t send and receive messages normally with the Kafka module. It seems that the production of messages cannot be monitored. Can you see that messages are sent and received normally when you start project monitoring?

@ruanwenjun
Copy link
Member Author

@Jackzeng1224 Yes, I can see the log that the message has been sent to kafka.
You can use AsyncSyncRequestInstance to send async requests, and I find a bug in LiteProducer line 218.
https://github.com/apache/incubator-eventmesh/blob/737c0f09efeba61a62653d33fd553491af7d0deb/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/http/producer/LiteProducer.java#L207-L219

This method should be async, but it sends a sync command, I will fix in another pr.
And you can modified the line 218 to

requestParam.addHeader(ProtocolKey.REQUEST_CODE, String.valueOf(RequestCode.MSG_SEND_ASYNC.getRequestCode()))

Then you will see below log in runtime module:

2021-07-27 19:28:37,824 INFO  [kafka-producer-network-thread | producer-1] message(SendAsyncMessageProcessor.java:196) - message|eventMesh2mq|REQ|ASYNC|send2MQCost=4ms|topic=TEST-TOPIC-TCP-ASYNC|bizSeqNo=920594188447336930612967879404|uniqueId=243186356065686374194669481534

But the response in AsyncSyncRequestInstance is also failed, I think it might be another issue.

2021-07-27 19:48:40,956 DEBUG [main] AsyncSyncRequestInstance(AsyncSyncRequestInstance.java:81) - sendmsg failed
org.apache.eventmesh.common.EventMeshException: com.alibaba.fastjson.JSONException: syntax error,except start with { or [,but actually start with error
	at org.apache.eventmesh.client.http.producer.RRCallbackResponseHandlerAdapter.handleResponse(RRCallbackResponseHandlerAdapter.java:94) ~[eventmesh-sdk-java-1.2.0-SNAPSHOT.jar:1.2.0-SNAPSHOT]
	at org.apache.eventmesh.client.http.producer.RRCallbackResponseHandlerAdapter.handleResponse(RRCallbackResponseHandlerAdapter.java:41) ~[eventmesh-sdk-java-1.2.0-SNAPSHOT.jar:1.2.0-SNAPSHOT]
	at org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:222) ~[httpclient-4.5.2.jar:4.5.2]
	at org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:164) ~[httpclient-4.5.2.jar:4.5.2]
	at org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:139) ~[httpclient-4.5.2.jar:4.5.2]
	at org.apache.eventmesh.client.http.http.HttpUtil.post(HttpUtil.java:149) ~[eventmesh-sdk-java-1.2.0-SNAPSHOT.jar:1.2.0-SNAPSHOT]
	at org.apache.eventmesh.client.http.producer.LiteProducer.request(LiteProducer.java:240) ~[eventmesh-sdk-java-1.2.0-SNAPSHOT.jar:1.2.0-SNAPSHOT]
	at org.apache.eventmesh.http.demo.AsyncSyncRequestInstance.main(AsyncSyncRequestInstance.java:73) ~[main/:?]
Caused by: com.alibaba.fastjson.JSONException: syntax error,except start with { or [,but actually start with error
	at com.alibaba.fastjson.parser.DefaultJSONParser.parseObject(DefaultJSONParser.java:684) ~[fastjson-1.2.71.jar:?]
	at com.alibaba.fastjson.JSON.parseObject(JSON.java:396) ~[fastjson-1.2.71.jar:?]
	at com.alibaba.fastjson.JSON.parseObject(JSON.java:300) ~[fastjson-1.2.71.jar:?]
	at com.alibaba.fastjson.JSON.parseObject(JSON.java:573) ~[fastjson-1.2.71.jar:?]
	at org.apache.eventmesh.client.http.producer.RRCallbackResponseHandlerAdapter.handleResponse(RRCallbackResponseHandlerAdapter.java:89) ~[eventmesh-sdk-java-1.2.0-SNAPSHOT.jar:1.2.0-SNAPSHOT]
	... 7 more

@Jackzeng1224
Copy link

@Jackzeng1224 Yes, I can see the log that the message has been sent to kafka.
You can use AsyncSyncRequestInstance to send async requests, and I find a bug in LiteProducer line 218.
https://github.com/apache/incubator-eventmesh/blob/737c0f09efeba61a62653d33fd553491af7d0deb/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/http/producer/LiteProducer.java#L207-L219

This method should be async, but it sends a sync command, I will fix in another pr.
And you can modified the line 218 to

requestParam.addHeader(ProtocolKey.REQUEST_CODE, String.valueOf(RequestCode.MSG_SEND_ASYNC.getRequestCode()))

Then you will see below log in runtime module:

2021-07-27 19:28:37,824 INFO  [kafka-producer-network-thread | producer-1] message(SendAsyncMessageProcessor.java:196) - message|eventMesh2mq|REQ|ASYNC|send2MQCost=4ms|topic=TEST-TOPIC-TCP-ASYNC|bizSeqNo=920594188447336930612967879404|uniqueId=243186356065686374194669481534

But the response in AsyncSyncRequestInstance is also failed, I think it might be another issue.

2021-07-27 19:48:40,956 DEBUG [main] AsyncSyncRequestInstance(AsyncSyncRequestInstance.java:81) - sendmsg failed
org.apache.eventmesh.common.EventMeshException: com.alibaba.fastjson.JSONException: syntax error,except start with { or [,but actually start with error
	at org.apache.eventmesh.client.http.producer.RRCallbackResponseHandlerAdapter.handleResponse(RRCallbackResponseHandlerAdapter.java:94) ~[eventmesh-sdk-java-1.2.0-SNAPSHOT.jar:1.2.0-SNAPSHOT]
	at org.apache.eventmesh.client.http.producer.RRCallbackResponseHandlerAdapter.handleResponse(RRCallbackResponseHandlerAdapter.java:41) ~[eventmesh-sdk-java-1.2.0-SNAPSHOT.jar:1.2.0-SNAPSHOT]
	at org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:222) ~[httpclient-4.5.2.jar:4.5.2]
	at org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:164) ~[httpclient-4.5.2.jar:4.5.2]
	at org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:139) ~[httpclient-4.5.2.jar:4.5.2]
	at org.apache.eventmesh.client.http.http.HttpUtil.post(HttpUtil.java:149) ~[eventmesh-sdk-java-1.2.0-SNAPSHOT.jar:1.2.0-SNAPSHOT]
	at org.apache.eventmesh.client.http.producer.LiteProducer.request(LiteProducer.java:240) ~[eventmesh-sdk-java-1.2.0-SNAPSHOT.jar:1.2.0-SNAPSHOT]
	at org.apache.eventmesh.http.demo.AsyncSyncRequestInstance.main(AsyncSyncRequestInstance.java:73) ~[main/:?]
Caused by: com.alibaba.fastjson.JSONException: syntax error,except start with { or [,but actually start with error
	at com.alibaba.fastjson.parser.DefaultJSONParser.parseObject(DefaultJSONParser.java:684) ~[fastjson-1.2.71.jar:?]
	at com.alibaba.fastjson.JSON.parseObject(JSON.java:396) ~[fastjson-1.2.71.jar:?]
	at com.alibaba.fastjson.JSON.parseObject(JSON.java:300) ~[fastjson-1.2.71.jar:?]
	at com.alibaba.fastjson.JSON.parseObject(JSON.java:573) ~[fastjson-1.2.71.jar:?]
	at org.apache.eventmesh.client.http.producer.RRCallbackResponseHandlerAdapter.handleResponse(RRCallbackResponseHandlerAdapter.java:89) ~[eventmesh-sdk-java-1.2.0-SNAPSHOT.jar:1.2.0-SNAPSHOT]
	... 7 more

OK!I will continue to pay attention.

@qqeasonchen
Copy link
Contributor

@ruanwenjun Could you please submmit this feature to branch kafka-connector?

ruanwenjun and others added 9 commits July 28, 2021 15:07
* [ISSUE apache#456] Automatic License report and check

* Add mail

close apache#456
…sion (apache#459)

* modify:optimize flow control in downstreaming msg

* modify:optimize stategy of selecting session in downstream msg

* modify:optimize msg downstream,msg store in session

* modify:fix bug:not a @sharable handler

* modify:downstream broadcast msg asynchronously

* modify:remove unneccessary interface in eventmesh-connector-api

* modify:fix conflict

* modify:add license in EventMeshAction

* modify:fix ack problem

close apache#457
* modify:optimize flow control in downstreaming msg

* modify:optimize stategy of selecting session in downstream msg

* modify:optimize msg downstream,msg store in session

* modify:fix bug:not a @sharable handler

* modify:downstream broadcast msg asynchronously

* modify:remove unneccessary interface in eventmesh-connector-api

* modify:fix conflict

* modify:add license in EventMeshAction

* modify:fix global flow control problem

close apache#461
…iptionType is SYNC (apache#463)

* [ISSUE apache#460]Support custom retry times configuration when SubcriptionType is SYNC

* fix log args
… root build.gradle (apache#466)

* [ISSUE apache#465]All third part dependencies should be controlled at root build.gradle

* fix license
* [ISSUE apache#374] add unit test for PushMessageRequestHeader class.

* [ISSUE apache#374] add unit test for PushMessageResponseHeader class.

* [ISSUE apache#374] add unit test for ReplyMessageRequestHeader class.

* [ISSUE apache#374] add unit test for ReplyMessageResponseHeader class.

close apache#374
@ruanwenjun
Copy link
Member Author

@Jackzeng1224 Hi, I have updated the code, please help me test, thanks.
Currently, it just supports the async send message method as the same withe rocketmqProducer, so you can just use the publish method to send message.

lrhkobe and others added 3 commits August 2, 2021 16:30
…on close of client (apache#477)

* modify:optimize flow control in downstreaming msg

* modify:optimize stategy of selecting session in downstream msg

* modify:optimize msg downstream,msg store in session

* modify:fix bug:not a @sharable handler

* modify:downstream broadcast msg asynchronously

* modify:remove unneccessary interface in eventmesh-connector-api

* modify:fix conflict

* modify:add license in EventMeshAction

* modify:fix ack problem

* modify:fix exception handle when exception occured in EventMeshTcpMessageDispatcher

* modify:fix log print

close apache#476
@Jackzeng1224
Copy link

Jackzeng1224 commented Aug 4, 2021

@Jackzeng1224 Hi, I have updated the code, please help me test, thanks.
Currently, it just supports the async send message method as the same withe rocketmqProducer, so you can just use the publish method to send message.

OK 👌. I will test .

@Jackzeng1224
Copy link

@ruanwenjun hello,The latest code, I run SpringBootDemoApplication and an exception occurs. I think it is a package usage problem.I use rocketMq will not appear.
image

@Jackzeng1224
Copy link

@ruanwenjun hello,New code, still did not solve the problem.

@ruanwenjun ruanwenjun changed the title [ISSUE #199] Support Kafka connector plugin and Kafka as event store #199 [WIP][ISSUE #199] Support Kafka connector plugin and Kafka as event store #199 Aug 14, 2021
@ruanwenjun ruanwenjun marked this pull request as draft January 3, 2022 14:03
@xwm1992
Copy link
Contributor

xwm1992 commented Jul 12, 2022

Since the PR's code has not been updated for a long time, I will close this PR.

@xwm1992 xwm1992 closed this Jul 12, 2022
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Support Kafka connector plugin and Kafka as event store
9 participants