Skip to content

Commit

Permalink
[Feature apache#562] Implement CloudEvents adaptor
Browse files Browse the repository at this point in the history
  • Loading branch information
xwm1992 committed Nov 17, 2021
1 parent cf8433a commit 18310a1
Show file tree
Hide file tree
Showing 21 changed files with 460 additions and 86 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,12 @@ public class ProtocolKey {
public static final String LANGUAGE = "Language";
public static final String VERSION = "Version";

public static final String PROTOCOL_TYPE = "protocol_type";

public static final String PROTOCOL_VERSION = "protocol_version";

public static final String PROTOCOL_DESC = "protocol_desc";

public static class ClientInstanceKey {
////////////////////////////////////Protocol layer requester description///////////
public static final String ENV = "Env";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,15 @@ public class ReplyMessageRequestHeader extends Header {
//protocol version adopted by requester, default:1.0
private ProtocolVersion version;

//protocol type, cloudevents or eventmeshMessage
private String protocolType;

//protocol version, cloudevents:1.0 or 0.3
private String protocolVersion;

//protocol desc
private String protocolDesc;

//the environment number of the requester
private String env;

Expand Down Expand Up @@ -139,10 +148,37 @@ public void setIp(String ip) {
this.ip = ip;
}

public String getProtocolType() {
return protocolType;
}

public void setProtocolType(String protocolType) {
this.protocolType = protocolType;
}

public String getProtocolVersion() {
return protocolVersion;
}

public void setProtocolVersion(String protocolVersion) {
this.protocolVersion = protocolVersion;
}

public String getProtocolDesc() {
return protocolDesc;
}

public void setProtocolDesc(String protocolDesc) {
this.protocolDesc = protocolDesc;
}

public static ReplyMessageRequestHeader buildHeader(Map<String, Object> headerParam) {
ReplyMessageRequestHeader header = new ReplyMessageRequestHeader();
header.setCode(MapUtils.getString(headerParam, ProtocolKey.REQUEST_CODE));
header.setVersion(ProtocolVersion.get(MapUtils.getString(headerParam, ProtocolKey.VERSION)));
header.setProtocolType(MapUtils.getString(headerParam, ProtocolKey.PROTOCOL_TYPE));
header.setProtocolVersion(MapUtils.getString(headerParam, ProtocolKey.PROTOCOL_VERSION));
header.setProtocolDesc(MapUtils.getString(headerParam, ProtocolKey.PROTOCOL_DESC));
String lan = StringUtils.isBlank(MapUtils.getString(headerParam, ProtocolKey.LANGUAGE))
? Constants.LANGUAGE_JAVA : MapUtils.getString(headerParam, ProtocolKey.LANGUAGE);
header.setLanguage(lan);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,15 @@ public class SendMessageBatchRequestHeader extends Header {
//protocol version adopted by requester, default:1.0
private ProtocolVersion version;

//protocol type, cloudevents or eventmeshMessage
private String protocolType;

//protocol version, cloudevents:1.0 or 0.3
private String protocolVersion;

//protocol desc
private String protocolDesc;

//the environment number of the requester
private String env;

Expand Down Expand Up @@ -140,10 +149,37 @@ public void setIp(String ip) {
this.ip = ip;
}

public String getProtocolType() {
return protocolType;
}

public void setProtocolType(String protocolType) {
this.protocolType = protocolType;
}

public String getProtocolVersion() {
return protocolVersion;
}

public void setProtocolVersion(String protocolVersion) {
this.protocolVersion = protocolVersion;
}

public String getProtocolDesc() {
return protocolDesc;
}

public void setProtocolDesc(String protocolDesc) {
this.protocolDesc = protocolDesc;
}

public static SendMessageBatchRequestHeader buildHeader(final Map<String, Object> headerParam) {
SendMessageBatchRequestHeader header = new SendMessageBatchRequestHeader();
header.setCode(MapUtils.getString(headerParam, ProtocolKey.REQUEST_CODE));
header.setVersion(ProtocolVersion.get(MapUtils.getString(headerParam, ProtocolKey.VERSION)));
header.setProtocolType(MapUtils.getString(headerParam, ProtocolKey.PROTOCOL_TYPE));
header.setProtocolVersion(MapUtils.getString(headerParam, ProtocolKey.PROTOCOL_VERSION));
header.setProtocolDesc(MapUtils.getString(headerParam, ProtocolKey.PROTOCOL_DESC));
String lan = StringUtils.isBlank(MapUtils.getString(headerParam, ProtocolKey.LANGUAGE))
? Constants.LANGUAGE_JAVA : MapUtils.getString(headerParam, ProtocolKey.LANGUAGE);
header.setLanguage(lan);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,15 @@ public class SendMessageBatchV2RequestHeader extends Header {
//protocol version adopted by requester, default:1.0
private ProtocolVersion version;

//protocol type, cloudevents or eventmeshMessage
private String protocolType;

//protocol version, cloudevents:1.0 or 0.3
private String protocolVersion;

//protocol desc
private String protocolDesc;

//the environment number of the requester
private String env;

Expand Down Expand Up @@ -139,10 +148,38 @@ public void setIp(String ip) {
this.ip = ip;
}

public String getProtocolType() {
return protocolType;
}

public void setProtocolType(String protocolType) {
this.protocolType = protocolType;
}

public String getProtocolVersion() {
return protocolVersion;
}

public void setProtocolVersion(String protocolVersion) {
this.protocolVersion = protocolVersion;
}

public String getProtocolDesc() {
return protocolDesc;
}

public void setProtocolDesc(String protocolDesc) {
this.protocolDesc = protocolDesc;
}

public static SendMessageBatchV2RequestHeader buildHeader(final Map<String, Object> headerParam) {
SendMessageBatchV2RequestHeader header = new SendMessageBatchV2RequestHeader();
header.setCode(MapUtils.getString(headerParam, ProtocolKey.REQUEST_CODE));
header.setVersion(ProtocolVersion.get(MapUtils.getString(headerParam, ProtocolKey.VERSION)));
header.setProtocolType(MapUtils.getString(headerParam, ProtocolKey.PROTOCOL_TYPE));
header.setProtocolVersion(MapUtils.getString(headerParam, ProtocolKey.PROTOCOL_VERSION));
header.setProtocolDesc(MapUtils.getString(headerParam, ProtocolKey.PROTOCOL_DESC));

String lan = StringUtils.isBlank(MapUtils.getString(headerParam, ProtocolKey.LANGUAGE))
? Constants.LANGUAGE_JAVA : MapUtils.getString(headerParam, ProtocolKey.LANGUAGE);
header.setLanguage(lan);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,15 @@ public class SendMessageRequestHeader extends Header {
//protocol version adopted by requester, default:1.0
private ProtocolVersion version;

//protocol type, cloudevents or eventmeshMessage
private String protocolType;

//protocol version, cloudevents:1.0 or 0.3
private String protocolVersion;

//protocol desc
private String protocolDesc;

//the environment number of the requester
private String env;

Expand Down Expand Up @@ -139,10 +148,38 @@ public void setIp(String ip) {
this.ip = ip;
}

public String getProtocolType() {
return protocolType;
}

public void setProtocolType(String protocolType) {
this.protocolType = protocolType;
}

public String getProtocolVersion() {
return protocolVersion;
}

public void setProtocolVersion(String protocolVersion) {
this.protocolVersion = protocolVersion;
}

public String getProtocolDesc() {
return protocolDesc;
}

public void setProtocolDesc(String protocolDesc) {
this.protocolDesc = protocolDesc;
}

public static SendMessageRequestHeader buildHeader(Map<String, Object> headerParam) {
SendMessageRequestHeader header = new SendMessageRequestHeader();
header.setCode(MapUtils.getString(headerParam, ProtocolKey.REQUEST_CODE));
header.setVersion(ProtocolVersion.get(MapUtils.getString(headerParam, ProtocolKey.VERSION)));
header.setProtocolType(MapUtils.getString(headerParam, ProtocolKey.PROTOCOL_TYPE));
header.setProtocolVersion(MapUtils.getString(headerParam, ProtocolKey.PROTOCOL_VERSION));
header.setProtocolDesc(MapUtils.getString(headerParam, ProtocolKey.PROTOCOL_DESC));

String lan = StringUtils.isBlank(MapUtils.getString(headerParam, ProtocolKey.LANGUAGE))
? Constants.LANGUAGE_JAVA : MapUtils.getString(headerParam, ProtocolKey.LANGUAGE);
header.setLanguage(lan);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,16 +18,20 @@
package org.apache.eventmesh.protocol.cloudevents;

import io.cloudevents.CloudEvent;
import io.cloudevents.core.builder.CloudEventBuilder;
import org.apache.commons.lang3.StringUtils;
import org.apache.eventmesh.common.Constants;
import org.apache.eventmesh.common.command.HttpCommand;
import org.apache.eventmesh.common.protocol.http.body.Body;
import org.apache.eventmesh.common.protocol.http.common.RequestCode;
import org.apache.eventmesh.common.protocol.tcp.Header;
import org.apache.eventmesh.common.protocol.tcp.Package;
import org.apache.eventmesh.protocol.api.ProtocolAdaptor;

import org.apache.eventmesh.protocol.api.exception.ProtocolHandleException;
import org.apache.eventmesh.protocol.cloudevents.resolver.http.SendMessageBatchProtocolResolver;
import org.apache.eventmesh.protocol.cloudevents.resolver.http.SendMessageBatchV2ProtocolResolver;
import org.apache.eventmesh.protocol.cloudevents.resolver.http.SendMessageRequestProtocolResolver;
import org.apache.eventmesh.protocol.cloudevents.resolver.tcp.TcpMessageProtocolResolver;

import java.nio.charset.StandardCharsets;
import java.util.List;
Expand All @@ -42,55 +46,41 @@ public class CloudEventsProtocolAdaptor<T> implements ProtocolAdaptor<T> {
@Override
public CloudEvent toCloudEvent(T cloudEvent) throws ProtocolHandleException {

CloudEventBuilder cloudEventBuilder;

if (cloudEvent instanceof Package) {

Header header = ((Package) cloudEvent).getHeader();
Object body = ((Package) cloudEvent).getBody();
String protocolType = header.getProperty(Constants.PROTOCOL_TYPE).toString();
String protocolVersion = header.getProperty(Constants.PROTOCOL_VERSION).toString();
String protocolDesc = header.getProperty(Constants.PROTOCOL_DESC).toString();

if (StringUtils.isBlank(protocolType)
|| StringUtils.isBlank(protocolVersion)
|| StringUtils.isBlank(protocolDesc)) {
throw new ProtocolHandleException(String.format("invalid protocol params protocolType %s|protocolVersion %s|protocolDesc %s",
protocolType, protocolVersion, protocolDesc));
}

if (!StringUtils.equals("cloudevents", protocolType)) {
throw new ProtocolHandleException(String.format("Unsupported protocolType: %s", protocolType));
}
if (StringUtils.equals("1.0", protocolVersion)) {
cloudEventBuilder = CloudEventBuilder.v1((CloudEvent) body);

for (String propKey : header.getProperties().keySet()) {
cloudEventBuilder.withExtension(propKey, header.getProperty(propKey).toString());
}
return deserializeTcpProtocol(header, body);

return cloudEventBuilder.build();
} else if (cloudEvent instanceof HttpCommand) {
org.apache.eventmesh.common.protocol.http.header.Header header = ((HttpCommand) cloudEvent).getHeader();
Body body = ((HttpCommand) cloudEvent).getBody();
String requestCode = ((HttpCommand) cloudEvent).getRequestCode();

} else if (StringUtils.equals("0.3", protocolVersion)) {
cloudEventBuilder = CloudEventBuilder.v03((CloudEvent) body);
return deserializeHttpProtocol(requestCode, header, body);
} else {
throw new ProtocolHandleException(String.format("protocol class: %s", cloudEvent.getClass()));
}
}

for (String propKey : header.getProperties().keySet()) {
cloudEventBuilder.withExtension(propKey, header.getProperty(propKey).toString());
}
private CloudEvent deserializeTcpProtocol(Header header, Object body) throws ProtocolHandleException {
return TcpMessageProtocolResolver.buildEvent(header, body);
}

return cloudEventBuilder.build();
private CloudEvent deserializeHttpProtocol(String requestCode, org.apache.eventmesh.common.protocol.http.header.Header header, Body body) throws ProtocolHandleException {

} else {
throw new ProtocolHandleException(String.format("Unsupported protocolVersion: %s", protocolVersion));
}
} else if (cloudEvent instanceof HttpCommand) {
org.apache.eventmesh.common.protocol.http.header.Header header = ((HttpCommand) cloudEvent).getHeader();
Body body = ((HttpCommand) cloudEvent).getBody();
//todo:convert httpCommand to cloudevents
if (String.valueOf(RequestCode.MSG_BATCH_SEND.getRequestCode()).equals(requestCode)) {
return SendMessageBatchProtocolResolver.buildEvent(header, body);
} else if (String.valueOf(RequestCode.MSG_BATCH_SEND_V2.getRequestCode()).equals(requestCode)) {
return SendMessageBatchV2ProtocolResolver.buildEvent(header, body);
} else if (String.valueOf(RequestCode.MSG_SEND_SYNC.getRequestCode()).equals(requestCode)) {
return SendMessageRequestProtocolResolver.buildEvent(header, body);
} else if (String.valueOf(RequestCode.MSG_SEND_ASYNC.getRequestCode()).equals(requestCode)) {
return SendMessageRequestProtocolResolver.buildEvent(header, body);
} else {
throw new ProtocolHandleException("protocol class: " + cloudEvent.getClass());
throw new ProtocolHandleException(String.format("unsupported requestCode: %s", requestCode));
}
return null;

}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
package org.apache.eventmesh.protocol.cloudevents.resolver.http;

import io.cloudevents.CloudEvent;
import org.apache.eventmesh.common.protocol.http.body.Body;
import org.apache.eventmesh.common.protocol.http.header.Header;

public class SendMessageBatchProtocolResolver {
public static CloudEvent buildEvent(Header header, Body body) {
return null;
}
}
Loading

0 comments on commit 18310a1

Please sign in to comment.