Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Issue #417] Grpc Transport Protocol support #701

Closed
wants to merge 150 commits into from
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
150 commits
Select commit Hold shift + click to select a range
6f57a90
readme.md
xwm1992 Oct 29, 2021
88f8ffc
Merge branch 'develop' of https://github.com/apache/incubator-eventme…
xwm1992 Nov 2, 2021
1feaaa7
[ISSUE #563] SDK SUPPORT CLOUD EVENT (#575)
wangshaojie4039 Nov 2, 2021
deee538
[Feature #562] support cloudevents api in eventmesh-connector-api mod…
xwm1992 Nov 10, 2021
3ae9234
connector support cloud event (#586)
wangshaojie4039 Nov 16, 2021
fcde151
[Feature #562] Support CloudEvents for pub/sub in EventMesh runtime (…
xwm1992 Nov 16, 2021
6828f57
[Feature #562] Implement CloudEvents protocol adaptor (#595)
xwm1992 Nov 18, 2021
0096cee
[Feature #562] Implement EventMeshMessage protocol adaptor (#597)
xwm1992 Nov 18, 2021
db02781
Remove some unused code (#591)
ruanwenjun Nov 19, 2021
f8ffb44
fixed a wrong url in cn-doc (#585)
sarihuangshanrong Nov 19, 2021
0acafcc
resolve_exception->Consumer subscription topic is invalid #590 (#598)
hagsyn Nov 19, 2021
fcd1218
update Eventmeshmessage plugin (#599)
xwm1992 Nov 19, 2021
e53acaf
Update eventmesh-store-quickstart.md
qqeasonchen Nov 21, 2021
4f04d9d
Update README.md
qqeasonchen Nov 21, 2021
71bcb09
Add slack icon (#601)
ruanwenjun Nov 21, 2021
ac8db5a
Add protocol producer in java sdk (#600)
ruanwenjun Nov 21, 2021
bcc1503
Change Tcp interface (#603)
ruanwenjun Nov 23, 2021
0082f3d
update http push request and adaptor (#606)
xwm1992 Nov 23, 2021
c66c8f7
update java sdk (#607)
xwm1992 Nov 23, 2021
5f61c24
Fix standalone connector interface, fix example (#608)
ruanwenjun Nov 24, 2021
a0204ae
fix compile error (#609)
xwm1992 Nov 24, 2021
0268792
Java sdk update (#610)
xwm1992 Nov 24, 2021
1c34373
Add EventMeshTCPClient, this client wrap the sub/sub client (#611)
ruanwenjun Nov 25, 2021
d8eb87f
runtime update (#612)
xwm1992 Nov 25, 2021
f7d0fbb
Java sdk update (#615)
xwm1992 Nov 26, 2021
f3b9520
eventmesh-admin-rocketmq submodule and createTopic REST API (#530)
yzhao244 Nov 29, 2021
d1064cb
Fix some small issue (#616)
ruanwenjun Nov 29, 2021
351f3ba
eventmeshmessage protocol adaptor for tcp (#618)
xwm1992 Nov 29, 2021
e337b79
remove openschema implementation from em project
yzhao244 Nov 25, 2021
b46026d
eventmesh message protocol for http pub/sub with standalone connector…
xwm1992 Nov 30, 2021
232080e
create topic command (#531)
yzhao244 Nov 30, 2021
b50c42a
Java sdk update cloudevents pub/sub (#620)
xwm1992 Nov 30, 2021
69fab8f
Update README.md
qqeasonchen Nov 30, 2021
0036d0a
Change TCP Decoder and Encoder (#621)
ruanwenjun Dec 1, 2021
5d2721f
cloudevents protocol tcp pub/sub for sdk (#622)
xwm1992 Dec 1, 2021
376a629
cloudevents protocol tcp pub/sub for sdk (#623)
xwm1992 Dec 2, 2021
f90714f
cloudevents protocol http pub/sub for sdk (#625)
xwm1992 Dec 2, 2021
35c4da2
cloudevents protocol pub/sub for http sdk (#627)
xwm1992 Dec 3, 2021
c6b700d
Add AbstractEventMeshTCPSubHandler (#624)
ruanwenjun Dec 5, 2021
eaef797
cloudevents/eventmesh message protocol pub/sub for sdk in rocketmq-co…
xwm1992 Dec 6, 2021
3b82ce6
cloudevents/eventmesh message protocol pub/sub for sdk in rocketmq-co…
xwm1992 Dec 7, 2021
6b1f25b
Safely delete useless log4j dependencies
vongosling Dec 8, 2021
92287b0
Safely delete outdated metrics dependencies
vongosling Dec 8, 2021
9e07445
Sately delete useless collections4 dependency
vongosling Dec 8, 2021
d45028c
Safely delete useless commons dependencies
vongosling Dec 8, 2021
a96e218
Reback to the collections4, but let it not in api dependency
vongosling Dec 8, 2021
6f2368e
Could not remove text for Safe Ramdom implementation
vongosling Dec 8, 2021
2772704
Fix build error
vongosling Dec 8, 2021
5e5de4a
Add collection4
vongosling Dec 8, 2021
56f1172
Fix build error
vongosling Dec 8, 2021
751e1ca
Revert "Safely delete outdated metrics dependencies"
vongosling Dec 8, 2021
7405376
Change to api dependency
vongosling Dec 8, 2021
b65580b
Remove doclint warnings
vongosling Dec 8, 2021
961d998
Depracated checkstyle check error temporarily, we should remove it wh…
vongosling Dec 8, 2021
46471ed
sync request/response for sdk in rocketmq-connector (#634)
xwm1992 Dec 8, 2021
79fdb12
Update .asf.yaml
qqeasonchen Dec 8, 2021
20ab726
Update .asf.yaml
qqeasonchen Dec 8, 2021
766337a
Merge branch 'develop' of https://github.com/apache/incubator-eventme…
xwm1992 Dec 8, 2021
8ea15f6
fix conflicts and update the code (#635)
xwm1992 Dec 8, 2021
f1b4729
Merge branch 'cloudevents' of https://github.com/apache/incubator-eve…
xwm1992 Dec 8, 2021
baddf64
supply apache header
xwm1992 Dec 9, 2021
600e255
Merge pull request #638 from xwm1992/cloudevents
xwm1992 Dec 9, 2021
280898a
Merge pull request #613 from yzhao244/integrate-openschema-dependency
qqeasonchen Dec 9, 2021
7629819
Remove license name, add LICENSE file
ruanwenjun Sep 24, 2021
dcd0f65
resolve conflict
ruanwenjun Sep 24, 2021
d537106
remove some not used in binary license
ruanwenjun Sep 26, 2021
cb8746c
set connector plugin to standalone
ruanwenjun Oct 8, 2021
05ac1fe
Add license check shell
ruanwenjun Nov 29, 2021
fafd741
Merge pull request #535 from ruanwenjun/dev_wenjun_fixLicenseCheck
xwm1992 Dec 10, 2021
4a9e915
Add jdk11 and otherOS in github ci
ruanwenjun Nov 28, 2021
66d7dbe
Add comments
ruanwenjun Dec 10, 2021
4c44aa5
resolve javadoc failed on jdk11
ruanwenjun Dec 10, 2021
8085a71
Change skywalking uses repo to main
ruanwenjun Dec 10, 2021
64cf6b5
Merge pull request #617 from ruanwenjun/dev_wenjun_addJdk11
vongosling Dec 10, 2021
135d386
Merge remote-tracking branch 'upstream/grpc' into grpc2
jinrongluo Dec 10, 2021
1148ba9
GRPC producer publish API
jinrongluo Dec 10, 2021
c9fe5d7
remove unused files
qqeasonchen Dec 13, 2021
d2370d1
remove unused plugin
qqeasonchen Dec 13, 2021
3d48165
[ISSUE #630] RocketMQProducerImpl cannot load config properties from …
ruanwenjun Dec 13, 2021
d501fcb
[Bug #646] Missing the rocketmq message properties during protocol co…
xwm1992 Dec 13, 2021
35f2c5d
Merge remote-tracking branch 'origin/develop' into grpc2
jinrongluo Dec 13, 2021
0d0a7da
grpc publish with cloudevents
jinrongluo Dec 13, 2021
9393d72
[MINOR] Hide ctx in callback function and update contributing doc (#644)
ruanwenjun Dec 14, 2021
0deb401
[ISSUE #405] remove some unused code (#649)
xwm1992 Dec 14, 2021
6718b0b
Merge remote-tracking branch 'origin/develop' into grpc2
jinrongluo Dec 14, 2021
c24e083
[Issue #417] grpc publish API
jinrongluo Dec 14, 2021
a54d1f5
[MINOR] Add third-part dependencies licenses (#650)
ruanwenjun Dec 15, 2021
05f8a3c
[MINOR] supply the license and update the third party license file na…
xwm1992 Dec 15, 2021
a9fe559
[ISSUE #340]Add http trace http point (#527)
Roc-00 Dec 16, 2021
e32c9f5
Update quick start docs (#656)
ruanwenjun Dec 16, 2021
f7c2694
[MINOR] Upgrade log4j version to 2.16.0 (#654)
ruanwenjun Dec 16, 2021
f30588e
[MINOR] update the license and add the third party license files (#657)
xwm1992 Dec 16, 2021
1c7429b
[ISSUE #604]Improve the rebalance algorithm (#605)
lrhkobe Dec 16, 2021
b77646c
[Feature #547] Create and upload 1.3.0-snapshot docker image (#659)
xwm1992 Dec 17, 2021
9507956
grpc consumer work
jinrongluo Dec 17, 2021
c3a1b00
Merge remote-tracking branch 'origin/develop' into grpc-dev
jinrongluo Dec 17, 2021
11c5772
Adding grpc publish and subscribe API examples
jinrongluo Dec 17, 2021
9460f75
grpc consumer api testing and bug fixing
jinrongluo Dec 17, 2021
8805fb3
minor enhancement to grpc consumer
jinrongluo Dec 17, 2021
b946360
[MINOR] Upgrade log4j version to 2.17.0 (#662)
xwm1992 Dec 19, 2021
0b105de
update project version to 1.3.0-RELEASE
xwm1992 Dec 20, 2021
8f0670f
Delete gradle/wrapper directory
xwm1992 Dec 20, 2021
3f38b82
update docs
xwm1992 Dec 20, 2021
52205e3
Merge branch '1.3.0-RELEASE' of https://github.com/apache/incubator-e…
xwm1992 Dec 20, 2021
d6037d4
update Dockerfile and build.gradle
xwm1992 Dec 20, 2021
be0fb29
update build.gradle
xwm1992 Dec 20, 2021
2790b78
update Dockerfile path
xwm1992 Dec 20, 2021
edb5e19
GRPC consumer Stream API
jinrongluo Dec 21, 2021
63b8875
Grpc consumer API enhancement
jinrongluo Dec 22, 2021
858ca04
Grpc unsubscribe API
jinrongluo Dec 22, 2021
9af7e32
Grpc consumer Heartbeat API
jinrongluo Dec 22, 2021
d9a15e9
grpc consumer enhancement
jinrongluo Dec 23, 2021
944d4f5
enhance consumer push API
jinrongluo Dec 24, 2021
07485ad
enhance consumer push API
jinrongluo Dec 24, 2021
1e99c79
Update .asf.yaml
xwm1992 Dec 28, 2021
2175a4b
Update .asf.yaml
xwm1992 Dec 29, 2021
91ff4bc
[Infra] trigger branch protection change
Humbedooh Dec 29, 2021
4d220ba
[Infra] retrigger .asf.yaml protections
Humbedooh Dec 29, 2021
8454128
update some docs
qqeasonchen Dec 20, 2021
84efbda
update Dockerfile path
xwm1992 Dec 20, 2021
53a4b94
Update java sdk docs (#663)
ruanwenjun Dec 21, 2021
5592df5
Add files via upload
xwm1992 Dec 23, 2021
7aaa349
[Infra] retrigger .asf.yaml protections
Humbedooh Dec 29, 2021
1ddb0bf
Merge pull request #675 from xwm1992/master
xwm1992 Dec 30, 2021
9b76a9e
Update .asf.yaml
xwm1992 Dec 30, 2021
5f42f42
Fix bugs in consumer API
Dec 30, 2021
8519fb0
[ISSUE #670] fix checkstyle check fail (#680)
jonyangx Dec 31, 2021
5adc007
[MINOR] Fixed redundant boxing operations (#684)
li-xiao-shuang Dec 31, 2021
1654c47
[MINOR] new Runnable() can be replaced with lambda (#685)
li-xiao-shuang Dec 31, 2021
fd60ab9
[MINOR] ConfigurationWrapper class adds thread pool shutdown (#683)
li-xiao-shuang Dec 31, 2021
dc7810e
GRPC RequestReply API
jinrongluo Dec 31, 2021
852c22b
[ISSUE #677] Translate readme files from English to Chinese (#678)
li-xiao-shuang Jan 4, 2022
2b3e861
[ISSUE #405]update cloudevents examples (#688)
li-xiao-shuang Jan 4, 2022
7fdcd8d
grpc batchpublish API
jinrongluo Jan 4, 2022
52d423f
Bump gradle version to 7.3.3 Support Java17 build
shoothzj Jan 5, 2022
a1140ab
Merge pull request #689 from Shoothzj/gradle-7-3
qqeasonchen Jan 5, 2022
032bb0d
[ISSUE #690]Remove extra code style check job in CI (#691)
ruanwenjun Jan 5, 2022
0cadf29
Update intro.md (#693)
qqeasonchen Jan 5, 2022
9fca946
Add files via upload (#694)
qqeasonchen Jan 5, 2022
80c4bf8
removing broadcast API
jinrongluo Jan 5, 2022
5c0f8a9
[ISSUE #692]Change the default merge strategy to squash (#695)
ruanwenjun Jan 6, 2022
8ac37a0
[ISSUE #673] update eventmesh-runtime-quickstart-with-docker.md en & …
jzhou59 Jan 6, 2022
a9d102c
[MINOR] Change Tar and Zip name (#699)
ruanwenjun Jan 6, 2022
5c89a23
[Issue #417] support cloudevents in GRPC
jinrongluo Jan 7, 2022
8a50a00
Merge remote-tracking branch 'origin/master' into grpc-dev
jinrongluo Jan 7, 2022
70080e7
[Issue #702] Fix Slack Join link (#705)
shoothzj Jan 10, 2022
80aa61a
Updated Notice file to 2022 (#704)
shoothzj Jan 10, 2022
5dc964b
add instruction docs of trace and metrics in eventmesh (#706)
Roc-00 Jan 10, 2022
db6b34f
[ISSUE #405]Fix args typo in examples (#707)
shoothzj Jan 10, 2022
42ba9fa
Merge remote-tracking branch 'origin/master' into grpc-dev
jinrongluo Jan 10, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
connector support cloud event (#586)
Co-authored-by: wangshaojie <[email protected]>
  • Loading branch information
wangshaojie4039 and wangshaojie committed Nov 16, 2021
commit 3ae92347ca341a34afae7d790e5e72aa34d7e2dd
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,6 @@
import io.openmessaging.api.transaction.LocalTransactionChecker;
import io.openmessaging.api.transaction.TransactionProducer;

import org.apache.eventmesh.connector.rocketmq.consumer.PushConsumerImpl;
import org.apache.eventmesh.connector.rocketmq.producer.ProducerImpl;

public class MessagingAccessPointImpl implements MessagingAccessPoint {

private Properties accessPointProperties;
Expand All @@ -52,7 +49,7 @@ public Properties attributes() {

@Override
public Producer createProducer(Properties properties) {
return new ProducerImpl(this.accessPointProperties);
return null;
}

@Override
Expand All @@ -72,7 +69,7 @@ public TransactionProducer createTransactionProducer(Properties properties) {

@Override
public Consumer createConsumer(Properties properties) {
return new PushConsumerImpl(properties);
return null;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.eventmesh.connector.rocketmq.cloudevent;

import org.apache.rocketmq.common.message.Message;

import java.util.Map;

import javax.annotation.ParametersAreNonnullByDefault;

import io.cloudevents.core.message.MessageReader;
import io.cloudevents.core.message.MessageWriter;
import io.cloudevents.core.message.impl.GenericStructuredMessageReader;
import io.cloudevents.core.message.impl.MessageUtils;
import io.cloudevents.lang.Nullable;
import io.cloudevents.rw.CloudEventRWException;
import io.cloudevents.rw.CloudEventWriter;

import org.apache.eventmesh.connector.rocketmq.cloudevent.impl.RocketMQBinaryMessageReader;
import org.apache.eventmesh.connector.rocketmq.cloudevent.impl.RocketMQHeaders;
import org.apache.eventmesh.connector.rocketmq.cloudevent.impl.RocketMQMessageWriter;


@ParametersAreNonnullByDefault
public final class RocketMQMessageFactory {

private RocketMQMessageFactory() {
// prevent instantiation
}

public static MessageReader createReader(final Message message) throws CloudEventRWException {
return createReader(message.getProperties(), message.getBody());
}


public static MessageReader createReader(final Map<String, String> props,
@Nullable final byte[] body)
throws CloudEventRWException {

return MessageUtils.parseStructuredOrBinaryMessage(
() -> props.get(RocketMQHeaders.CONTENT_TYPE),
format -> new GenericStructuredMessageReader(format, body),
() -> props.get(RocketMQHeaders.SPEC_VERSION),
sv -> new RocketMQBinaryMessageReader(sv, props, body)
);
}


public static MessageWriter<CloudEventWriter<Message>, Message> createWriter(String topic) {
return new RocketMQMessageWriter<>(topic);
}

public static MessageWriter<CloudEventWriter<Message>, Message> createWriter(String topic,
String keys) {
return new RocketMQMessageWriter<>(topic, keys);
}

public static MessageWriter<CloudEventWriter<Message>, Message> createWriter(String topic,
String keys,
String tags) {
return new RocketMQMessageWriter<>(topic, keys, tags);
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.eventmesh.connector.rocketmq.cloudevent.impl;

import java.util.Map;
import java.util.Objects;
import java.util.function.BiConsumer;

import io.cloudevents.SpecVersion;
import io.cloudevents.core.data.BytesCloudEventData;
import io.cloudevents.core.message.impl.BaseGenericBinaryMessageReaderImpl;

public class RocketMQBinaryMessageReader
extends BaseGenericBinaryMessageReaderImpl<String, String> {

private final Map<String, String> headers;

public RocketMQBinaryMessageReader(SpecVersion version, Map<String, String> headers,
byte[] payload) {
super(version,
payload != null && payload.length > 0 ? BytesCloudEventData.wrap(payload) : null);

Objects.requireNonNull(headers);
this.headers = headers;
}

@Override
protected boolean isContentTypeHeader(String key) {
return key.equals(RocketMQHeaders.CONTENT_TYPE);
}

@Override
protected boolean isCloudEventsHeader(String key) {
return key.length() > 3 && key.substring(0, RocketMQHeaders.CE_PREFIX.length())
.startsWith(RocketMQHeaders.CE_PREFIX);
}

@Override
protected String toCloudEventsKey(String key) {
return key.substring(RocketMQHeaders.CE_PREFIX.length()).toLowerCase();
}

@Override
protected void forEachHeader(BiConsumer<String, String> fn) {
this.headers.forEach((k, v) -> fn.accept(k, v));
}

@Override
protected String toCloudEventsValue(String value) {
return value;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.eventmesh.connector.rocketmq.cloudevent.impl;

import java.util.Map;

import io.cloudevents.core.message.impl.MessageUtils;
import io.cloudevents.core.v1.CloudEventV1;

public class RocketMQHeaders {

public static final String CE_PREFIX = "CE_";

protected static final Map<String, String> ATTRIBUTES_TO_HEADERS =
MessageUtils.generateAttributesToHeadersMapping(v -> CE_PREFIX + v);

public static final String CONTENT_TYPE =
ATTRIBUTES_TO_HEADERS.get(CloudEventV1.DATACONTENTTYPE);

public static final String SPEC_VERSION = ATTRIBUTES_TO_HEADERS.get(CloudEventV1.SPECVERSION);


}

Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.eventmesh.connector.rocketmq.cloudevent.impl;

import org.apache.rocketmq.common.message.Message;

import io.cloudevents.CloudEventData;
import io.cloudevents.SpecVersion;
import io.cloudevents.core.format.EventFormat;
import io.cloudevents.core.message.MessageWriter;
import io.cloudevents.rw.CloudEventContextWriter;
import io.cloudevents.rw.CloudEventRWException;
import io.cloudevents.rw.CloudEventWriter;


public final class RocketMQMessageWriter<R>
implements MessageWriter<CloudEventWriter<Message>, Message>, CloudEventWriter<Message> {

private Message message;


public RocketMQMessageWriter(String topic) {
message = new Message();
message.setTopic(topic);
}

public RocketMQMessageWriter(String topic, String keys) {
message = new Message();

message.setTopic(topic);

if (keys != null && keys.length() > 0) {
message.setKeys(keys);
}
}

public RocketMQMessageWriter(String topic, String keys, String tags) {
message = new Message();

message.setTopic(topic);

if (tags != null && tags.length() > 0) {
message.setTags(tags);
}

if (keys != null && keys.length() > 0) {
message.setKeys(keys);
}
}


@Override
public CloudEventContextWriter withContextAttribute(String name, String value)
throws CloudEventRWException {

String propName = RocketMQHeaders.ATTRIBUTES_TO_HEADERS.get(name);
if (propName == null) {
propName = RocketMQHeaders.CE_PREFIX + name;
}
message.putUserProperty(propName, value);
return this;
}

@Override
public RocketMQMessageWriter<R> create(final SpecVersion version) {
message.putUserProperty(RocketMQHeaders.SPEC_VERSION, version.toString());
return this;
}

@Override
public Message setEvent(final EventFormat format, final byte[] value)
throws CloudEventRWException {
message.putUserProperty(RocketMQHeaders.CONTENT_TYPE, format.serializedContentType());
message.setBody(value);
return message;
}

@Override
public Message end(final CloudEventData data) throws CloudEventRWException {
message.setBody(data.toBytes());
return message;
}

@Override
public Message end() {
message.setBody(null);
return message;
}
}
Loading