Skip to content

Commit

Permalink
fix(java interop): fixed mismatch content-type
Browse files Browse the repository at this point in the history
  • Loading branch information
SHA-4096 committed Jul 5, 2024
1 parent 250909d commit 295cb9b
Show file tree
Hide file tree
Showing 7 changed files with 70 additions and 9 deletions.
4 changes: 4 additions & 0 deletions protocol/triple/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,8 @@ func (s *Server) Start(invoker protocol.Invoker, info *server.ServiceInfo) {
// todo:https:// move tls config to handleService

hanOpts := getHanOpts(URL)
//Set expected codec name from serviceinfo
hanOpts = append(hanOpts, tri.WithExpectedCodecName(serialization))
intfName := URL.Interface()
if info != nil {
// new triple idl mode
Expand Down Expand Up @@ -119,6 +121,8 @@ func (s *Server) RefreshService(invoker protocol.Invoker, info *server.ServiceIn
panic(fmt.Sprintf("Unsupported serialization: %s", serialization))
}
hanOpts := getHanOpts(URL)
//Set expected codec name from serviceinfo
hanOpts = append(hanOpts, tri.WithExpectedCodecName(serialization))
intfName := URL.Interface()
if info != nil {
s.handleServiceWithInfo(intfName, invoker, info, hanOpts...)
Expand Down
19 changes: 17 additions & 2 deletions protocol/triple/triple_protocol/envelope.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"bytes"
"encoding/binary"
"errors"
"github.com/dubbogo/gost/log/logger"
"io"
)

Expand Down Expand Up @@ -51,6 +52,7 @@ func (e *envelope) IsSet(flag uint8) bool {
type envelopeWriter struct {
writer io.Writer
codec Codec
backupCodec Codec
compressMinBytes int
compressionPool *compressionPool
bufferPool *bufferPool
Expand All @@ -70,7 +72,13 @@ func (w *envelopeWriter) Marshal(message interface{}) *Error {
}
raw, err := w.codec.Marshal(message)
if err != nil {
return errorf(CodeInternal, "marshal message: %w", err)
if w.codec.Name() != w.backupCodec.Name() {
logger.Warnf("failed to marshal message with codec %s, trying alternative codec %s", w.codec.Name(), w.backupCodec.Name())
raw, err = w.backupCodec.Marshal(message)
}
if err != nil {
return errorf(CodeInternal, "marshal message: %w", err)
}
}
// We can't avoid allocating the byte slice, so we may as well reuse it once
// we're done with it.
Expand Down Expand Up @@ -127,6 +135,7 @@ func (w *envelopeWriter) write(env *envelope) *Error {
type envelopeReader struct {
reader io.Reader
codec Codec
backupCodec Codec //backupCodec is for mismatch between expected codec and content-type
last envelope
compressionPool *compressionPool
bufferPool *bufferPool
Expand Down Expand Up @@ -189,7 +198,13 @@ func (r *envelopeReader) Unmarshal(message interface{}) *Error {
}

if err := r.codec.Unmarshal(data.Bytes(), message); err != nil {
return errorf(CodeInvalidArgument, "unmarshal into %T: %w", message, err)
if r.backupCodec.Name() != r.codec.Name() {
logger.Warnf("failed to unmarshal message with codec %s, trying alternative codec %s", r.codec.Name(), r.backupCodec.Name())
err = r.backupCodec.Unmarshal(data.Bytes(), message)
}
if err != nil {
return errorf(CodeInvalidArgument, "unmarshal into %T: %w", message, err)
}
}
return nil
}
Expand Down
11 changes: 7 additions & 4 deletions protocol/triple/triple_protocol/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -371,6 +371,7 @@ type handlerConfig struct {
CompressionPools map[string]*compressionPool
CompressionNames []string
Codecs map[string]Codec
ExpectedCodecName string
CompressMinBytes int
Interceptor Interceptor
Procedure string
Expand Down Expand Up @@ -424,16 +425,18 @@ func (c *handlerConfig) newProtocolHandlers(streamType StreamType) []protocolHan
// protocol -> protocolHandler
handlers := make([]protocolHandler, 0, len(protocols))
// initialize codec and compressor
codecs := newReadOnlyCodecs(c.Codecs)
compressors := newReadOnlyCompressionPools(
c.CompressionPools,
c.CompressionNames,
)
codecs := newReadOnlyCodecs(c.Codecs)

for _, protocol := range protocols {
handlers = append(handlers, protocol.NewHandler(&protocolHandlerParams{
Spec: c.newSpec(streamType),
Codecs: codecs,
CompressionPools: compressors,
Spec: c.newSpec(streamType),
Codecs: codecs,
CompressionPools: compressors,
ExpectedCodecName: c.ExpectedCodecName,
// config content
CompressMinBytes: c.CompressMinBytes,
BufferPool: c.BufferPool,
Expand Down
16 changes: 16 additions & 0 deletions protocol/triple/triple_protocol/option.go
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,10 @@ func WithVersion(version string) Option {
return &versionOption{version}
}

func WithExpectedCodecName(ExpectedCodecName string) Option {
return &ExpectedCodecNameOption{ExpectedCodecName: ExpectedCodecName}
}

// Option implements both [ClientOption] and [HandlerOption], so it can be
// applied both client-side and server-side.
type Option interface {
Expand Down Expand Up @@ -471,6 +475,18 @@ func (o *idempotencyOption) applyToHandler(config *handlerConfig) {
config.IdempotencyLevel = o.idempotencyLevel
}

type ExpectedCodecNameOption struct {
ExpectedCodecName string
}

func (o *ExpectedCodecNameOption) applyToClient(config *clientConfig) {
//Do nothing as client doesn't have codec issues
}

func (o *ExpectedCodecNameOption) applyToHandler(config *handlerConfig) {
config.ExpectedCodecName = o.ExpectedCodecName
}

type tripleOption struct{}

func (o *tripleOption) applyToClient(config *clientConfig) {
Expand Down
1 change: 1 addition & 0 deletions protocol/triple/triple_protocol/protocol.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ type protocol interface {
type protocolHandlerParams struct {
Spec Spec
Codecs readOnlyCodecs
ExpectedCodecName string
CompressionPools readOnlyCompressionPools
CompressMinBytes int
BufferPool *bufferPool
Expand Down
3 changes: 3 additions & 0 deletions protocol/triple/triple_protocol/protocol_grpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -189,6 +189,7 @@ func (g *grpcHandler) NewConn(
// content-type -> codecName -> codec
codecName := grpcCodecFromContentType(getHeaderCanonical(request.Header, headerContentType))
codec := g.Codecs.Get(codecName) // handler.go guarantees this is not nil
backupCodec := g.Codecs.Get(g.ExpectedCodecName)
protocolName := ProtocolGRPC
conn := wrapHandlerConnWithCodedErrors(&grpcHandlerConn{
spec: g.Spec,
Expand All @@ -203,6 +204,7 @@ func (g *grpcHandler) NewConn(
writer: responseWriter,
compressionPool: g.CompressionPools.Get(responseCompression),
codec: codec,
backupCodec: backupCodec,
compressMinBytes: g.CompressMinBytes,
bufferPool: g.BufferPool,
sendMaxBytes: g.SendMaxBytes,
Expand All @@ -216,6 +218,7 @@ func (g *grpcHandler) NewConn(
envelopeReader: envelopeReader{
reader: request.Body,
codec: codec,
backupCodec: backupCodec,
compressionPool: g.CompressionPools.Get(requestCompression),
bufferPool: g.BufferPool,
readMaxBytes: g.ReadMaxBytes,
Expand Down
25 changes: 22 additions & 3 deletions protocol/triple/triple_protocol/protocol_triple.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"encoding/json"
"errors"
"fmt"
"github.com/dubbogo/gost/log/logger"
"io"
"math"
"net/http"
Expand Down Expand Up @@ -150,13 +151,14 @@ func (h *tripleHandler) NewConn(
var requestBody io.ReadCloser
var contentType, codecName string
requestBody = request.Body
//Prioritize codec specified by content-type
contentType = getHeaderCanonical(request.Header, headerContentType)
codecName = tripleCodecFromContentType(
h.Spec.StreamType,
contentType,
)

codec := h.Codecs.Get(codecName)
backupCodec := h.Codecs.Get(h.ExpectedCodecName)
// todo:https:// need to figure it out
// The codec can be nil in the GET request case; that's okay: when failed
// is non-nil, codec is never used.
Expand Down Expand Up @@ -189,6 +191,7 @@ func (h *tripleHandler) NewConn(
marshaler: tripleUnaryMarshaler{
writer: responseWriter,
codec: codec,
backupCodec: backupCodec,
compressMinBytes: h.CompressMinBytes,
compressionName: responseCompression,
compressionPool: h.CompressionPools.Get(responseCompression),
Expand All @@ -199,6 +202,7 @@ func (h *tripleHandler) NewConn(
unmarshaler: tripleUnaryUnmarshaler{
reader: requestBody,
codec: codec,
backupCodec: backupCodec,
compressionPool: h.CompressionPools.Get(requestCompression),
bufferPool: h.BufferPool,
readMaxBytes: h.ReadMaxBytes,
Expand Down Expand Up @@ -476,6 +480,7 @@ func (hc *tripleUnaryHandlerConn) writeResponseHeader(err error) {
type tripleUnaryMarshaler struct {
writer io.Writer
codec Codec
backupCodec Codec
compressMinBytes int
compressionName string
compressionPool *compressionPool
Expand All @@ -490,7 +495,13 @@ func (m *tripleUnaryMarshaler) Marshal(message interface{}) *Error {
}
data, err := m.codec.Marshal(message)
if err != nil {
return errorf(CodeInternal, "marshal message: %w", err)
if m.codec.Name() != m.backupCodec.Name() {
logger.Warnf("failed to marshal message with codec %s, trying alternative codec %s", m.codec.Name(), m.backupCodec.Name())
data, err = m.backupCodec.Marshal(message)
}
if err != nil {
return errorf(CodeInternal, "marshal message: %w", err)
}
}
// Can't avoid allocating the slice, but we can reuse it.
uncompressed := bytes.NewBuffer(data)
Expand Down Expand Up @@ -537,14 +548,22 @@ func (m *tripleUnaryRequestMarshaler) Marshal(message interface{}) *Error {
type tripleUnaryUnmarshaler struct {
reader io.Reader
codec Codec
backupCodec Codec //backupCodec is for the situation when content-type mismatches with the expected codec
compressionPool *compressionPool
bufferPool *bufferPool
alreadyRead bool
readMaxBytes int
}

func (u *tripleUnaryUnmarshaler) Unmarshal(message interface{}) *Error {
return u.UnmarshalFunc(message, u.codec.Unmarshal)
err := u.UnmarshalFunc(message, u.codec.Unmarshal)
if err != nil {
if u.codec.Name() != u.backupCodec.Name() {
logger.Warnf("failed to unmarshal message with codec %s, trying alternative codec %s", u.codec.Name(), u.backupCodec.Name())
err = u.UnmarshalFunc(message, u.codec.Unmarshal)
}
}
return err
}

func (u *tripleUnaryUnmarshaler) UnmarshalFunc(message interface{}, unmarshal func([]byte, interface{}) error) *Error {
Expand Down

0 comments on commit 295cb9b

Please sign in to comment.