Skip to content

Commit

Permalink
Refactor the call protocol (#23)
Browse files Browse the repository at this point in the history
Remaining response bytes returned by the call protocol is used now only to generate an error. But the call protocol is able to do it by itself so its interface may be simplified what was done in this PR.

Additionally, all call protocol tests were refactored to use regular mocks and to be more accurate.
  • Loading branch information
vykulakov committed Dec 27, 2019
1 parent 19cbc0f commit f335aaa
Show file tree
Hide file tree
Showing 6 changed files with 141 additions and 113 deletions.
32 changes: 25 additions & 7 deletions call_protocol.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,13 @@ package avroipc
import (
"bytes"
"fmt"

"github.com/linkedin/goavro"
)

type CallProtocol interface {
PrepareRequest(method string, datum interface{}) ([]byte, error)
ParseResponse(method string, responseBytes []byte) (interface{}, []byte, error)
ParseResponse(method string, responseBytes []byte) (interface{}, error)
}

// The Avro Call format implementation for the Avro RPC protocol.
Expand Down Expand Up @@ -78,26 +79,43 @@ func (p *сallProtocol) PrepareRequest(method string, datum interface{}) ([]byte
return buf.Bytes(), nil
}

func (p *сallProtocol) ParseResponse(method string, responseBytes []byte) (interface{}, []byte, error) {
func (p *сallProtocol) ParseResponse(method string, responseBytes []byte) (interface{}, error) {
meta, responseBytes, err := p.metaCodec.NativeFromBinary(responseBytes)
if err != nil {
return nil, responseBytes, err
return nil, err
}
_ = meta

flag, responseBytes, err := p.booleanCodec.NativeFromBinary(responseBytes)
if err != nil {
return nil, responseBytes, err
return nil, err
}
flagBool, ok := flag.(bool)
if !ok {
return nil, responseBytes, fmt.Errorf("cannot convert error flag to boolean: %v", flag)
return nil, fmt.Errorf("cannot convert error flag to boolean: %v", flag)
}

if flagBool {
responseBytes, err = p.proto.ParseError(method, responseBytes)
return nil, responseBytes, err
if err != nil {
return nil, err
}
return nil, p.checkResponseBytes(responseBytes)
}

message, responseBytes, err := p.proto.ParseMessage(method, responseBytes)
if err != nil {
return nil, err
}

return message, p.checkResponseBytes(responseBytes)
}

func (p *сallProtocol) checkResponseBytes(b []byte) error {
n := len(b)
if n > 0 {
return fmt.Errorf("response buffer is not empty: len=%d, rest=0x%X", n, b)
}

return p.proto.ParseMessage(method, responseBytes)
return nil
}
151 changes: 100 additions & 51 deletions call_protocol_test.go
Original file line number Diff line number Diff line change
@@ -1,90 +1,139 @@
package avroipc_test

import (
"fmt"
"errors"
"testing"

"github.com/myzhan/avroipc"
"github.com/myzhan/avroipc/mocks"
"github.com/stretchr/testify/require"
"testing"
)

func TestCallProtocol_PrepareRequest(t *testing.T) {
p, err := avroipc.NewCallProtocol(&mocks.MockProtocol{})
func prepareCallProtocol(t *testing.T) (avroipc.CallProtocol, *mocks.MockProtocol) {
m := &mocks.MockProtocol{}

p, err := avroipc.NewCallProtocol(m)
require.NoError(t, err)

return p, m
}

func TestCallProtocol_PrepareRequest(t *testing.T) {
datum := []byte{0xA, 0xB, 0xC}
message := []byte{0xD, 0xE, 0xF}
emptyMethod := ""
appendMethod := "append"

nilError := error(nil)
testError := errors.New("test error")

t.Run("empty method", func(t *testing.T) {
actual, err := p.PrepareRequest("", nil)
p, m := prepareCallProtocol(t)
m.On("PrepareMessage", emptyMethod, datum).Return(message, nilError).Once()

actual, err := p.PrepareRequest(emptyMethod, datum)
require.NoError(t, err)
require.Equal(t, []byte{0x0, 0x0}, actual)
require.Equal(t, []byte{0x0, 0x0, 0xD, 0xE, 0xF}, actual)

m.AssertExpectations(t)
})
t.Run("append method", func(t *testing.T) {
actual, err := p.PrepareRequest("append", nil)
p, m := prepareCallProtocol(t)
m.On("PrepareMessage", appendMethod, datum).Return(message, nilError).Once()

actual, err := p.PrepareRequest(appendMethod, datum)
require.NoError(t, err)
require.Equal(t, []byte{0x0, 0xc, 0x61, 0x70, 0x70, 0x65, 0x6e, 0x64}, actual)
require.Equal(t, []byte{0x0, 0xc, 0x61, 0x70, 0x70, 0x65, 0x6e, 0x64, 0xD, 0xE, 0xF}, actual)

m.AssertExpectations(t)
})
t.Run("protocol error", func(t *testing.T) {
p, err := avroipc.NewCallProtocol(&mocks.MockProtocol{
Err: fmt.Errorf("test error"),
})
require.NoError(t, err)
p, m := prepareCallProtocol(t)
m.On("PrepareMessage", "append", datum).Return(message, testError).Once()

_, err = p.PrepareRequest("append", nil)
_, err := p.PrepareRequest("append", datum)
require.EqualError(t, err, "test error")

m.AssertExpectations(t)
})
}

func TestCallProtocol_ParseResponse(t *testing.T) {
status := "test status"
method := "test method"

rest := []byte{}
data := append([]byte{0xA, 0xB, 0xC}, rest...)

longRest := []byte{0xD, 0xE, 0xF}
longData := append([]byte{}, longRest...)

okResponse := append([]byte{0x0, 0x0}, data...)
badResponse := append([]byte{0x0, 0x2}, data...)
longResponse := append([]byte{0x0, 0x0}, longData...)
shortResponse := []byte{0x0}
errorResponse := append([]byte{0x0, 0x1}, data...)
errorLongResponse := append([]byte{0x0, 0x1}, longData...)

nilError := error(nil)
testError := errors.New("test error")

t.Run("success", func(t *testing.T) {
expected := "test"
p, err := avroipc.NewCallProtocol(&mocks.MockProtocol{
Response: expected,
ParseResponseBytes: []byte{0x77, 0x78},
})
require.NoError(t, err)
p, m := prepareCallProtocol(t)
m.On("ParseMessage", method, data).Return(status, rest, nilError).Once()

actual, bytes, err := p.ParseResponse("", []byte{0x0, 0x0})
actual, err := p.ParseResponse(method, okResponse)
require.NoError(t, err)
require.Equal(t, []byte{0x77, 0x78}, bytes)
require.Equal(t, expected, actual)
require.Equal(t, status, actual)

m.AssertExpectations(t)
})

t.Run("short buffer", func(t *testing.T) {
expected := "test"
p, err := avroipc.NewCallProtocol(&mocks.MockProtocol{
Response: expected,
ParseResponseBytes: []byte{0x77, 0x78},
})
require.NoError(t, err)
t.Run("bad flag", func(t *testing.T) {
p, m := prepareCallProtocol(t)

_, bytes, err := p.ParseResponse("", []byte{0x0})
require.EqualError(t, err, "short buffer")
require.Equal(t, []byte{}, bytes)
_, err := p.ParseResponse(method, badResponse)
require.EqualError(t, err, "cannot decode binary boolean: expected: Go byte(0) or byte(1); received: byte(2)")

m.AssertExpectations(t)
})

t.Run("bad flag", func(t *testing.T) {
expected := "test"
p, err := avroipc.NewCallProtocol(&mocks.MockProtocol{
Response: expected,
ParseResponseBytes: []byte{0x77, 0x78},
})
require.NoError(t, err)
t.Run("short buffer", func(t *testing.T) {
p, m := prepareCallProtocol(t)

_, err := p.ParseResponse(method, shortResponse)
require.EqualError(t, err, "short buffer")

_, bytes, err := p.ParseResponse("", []byte{0x0, 0x2})
require.Error(t, err)
require.Contains(t, err.Error(), "cannot decode binary boolean")
require.Equal(t, []byte{0x2}, bytes)
m.AssertExpectations(t)
})

t.Run("process error", func(t *testing.T) {
expected := fmt.Errorf("test error")
p, err := avroipc.NewCallProtocol(&mocks.MockProtocol{
Err: expected,
ErrorResponseBytes: []byte{0x88, 0x89},
})
require.NoError(t, err)
p, m := prepareCallProtocol(t)
m.On("ParseError", method, data).Return(rest, testError).Once()

_, bytes, err := p.ParseResponse("", []byte{0x0, 0x1})
_, err := p.ParseResponse(method, errorResponse)
require.EqualError(t, err, "test error")
require.Equal(t, []byte{0x88, 0x89}, bytes)

m.AssertExpectations(t)
})

t.Run("buffer not empty", func(t *testing.T) {
p, m := prepareCallProtocol(t)
m.On("ParseMessage", method, longData).Return(status, longRest, nilError).Once()

_, err := p.ParseResponse(method, longResponse)
require.EqualError(t, err, "response buffer is not empty: len=3, rest=0x0D0E0F")

m.AssertExpectations(t)
})

t.Run("process error with non-empty buffer", func(t *testing.T) {
p, m := prepareCallProtocol(t)
m.On("ParseError", method, longData).Return(longRest, nilError).Once()

_, err := p.ParseResponse(method, errorLongResponse)
require.EqualError(t, err, "response buffer is not empty: len=3, rest=0x0D0E0F")

m.AssertExpectations(t)
})
}
8 changes: 1 addition & 7 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,17 +135,11 @@ func (c *client) sendMessage(method string, datum interface{}) (string, error) {
return "", err
}

response, responseBytes, err := c.callProtocol.ParseResponse(method, responseBytes)
response, err := c.callProtocol.ParseResponse(method, responseBytes)
if err != nil {
return "", err
}

r := responseBytes
n := len(responseBytes)
if n > 0 {
return "", fmt.Errorf("response buffer is not empty: len=%d, rest=0x%X", n, r)
}

status, ok := response.(string)
if !ok {
return "", fmt.Errorf("cannot convert status to string: %v", response)
Expand Down
42 changes: 4 additions & 38 deletions client_internal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,6 @@ func TestClient_Append(t *testing.T) {

request := []byte{0x0A, 0x0B}
response := []byte{0x1A, 0x1B}
remaining := []byte{0x2A, 0x2B}

origEvent := &Event{Headers: map[string]string{}, Body: []byte("test body")}
prepEvent := origEvent.toMap()
Expand All @@ -87,7 +86,7 @@ func TestClient_Append(t *testing.T) {
f.On("Write", request).Return(nil)
x.On("Flush").Return(nil).Once()
f.On("Read").Return(response, nil)
p.On("ParseResponse", method, response).Return("SOME", []byte{}, nil)
p.On("ParseResponse", method, response).Return("SOME", nil)

status, err := c.Append(origEvent)
require.NoError(t, err)
Expand All @@ -103,38 +102,21 @@ func TestClient_Append(t *testing.T) {
f.On("Write", request).Return(nil).Once()
x.On("Flush").Return(nil).Once()
f.On("Read").Return(response, nil).Once()
p.On("ParseResponse", method, response).Return(0, []byte{}, nil).Once()
p.On("ParseResponse", method, response).Return(0, nil).Once()

status, err := c.Append(origEvent)
require.EqualError(t, err, "cannot convert status to string: 0")
require.Equal(t, "", status)
p.AssertExpectations(t)
f.AssertExpectations(t)
})

t.Run("non-empty response buffer", func(t *testing.T) {
c, x, f, p, _ := prepare()

p.On("PrepareRequest", method, prepEvent).Return(request, nil).Once()
f.On("Write", request).Return(nil).Once()
x.On("Flush").Return(nil).Once()
f.On("Read").Return(response, nil).Once()
p.On("ParseResponse", method, response).Return("SOME", remaining, nil).Once()

status, err := c.Append(origEvent)
require.EqualError(t, err, "response buffer is not empty: len=2, rest=0x2A2B")
require.Equal(t, "", status)
p.AssertExpectations(t)
f.AssertExpectations(t)
})
}

func TestClient_AppendBatch(t *testing.T) {
method := "appendBatch"

request := []byte{0x0A, 0x0B}
response := []byte{0x1A, 0x1B}
remaining := []byte{0x2A, 0x2B}

origEvents := []*Event{
{Headers: map[string]string{}, Body: []byte("test body 1")},
Expand All @@ -152,7 +134,7 @@ func TestClient_AppendBatch(t *testing.T) {
f.On("Write", request).Return(nil)
x.On("Flush").Return(nil).Once()
f.On("Read").Return(response, nil)
p.On("ParseResponse", method, response).Return("SOME", []byte{}, nil)
p.On("ParseResponse", method, response).Return("SOME", nil)

status, err := c.AppendBatch(origEvents)
require.NoError(t, err)
Expand All @@ -168,30 +150,14 @@ func TestClient_AppendBatch(t *testing.T) {
f.On("Write", request).Return(nil).Once()
x.On("Flush").Return(nil).Once()
f.On("Read").Return(response, nil).Once()
p.On("ParseResponse", method, response).Return(0, []byte{}, nil).Once()
p.On("ParseResponse", method, response).Return(0, nil).Once()

status, err := c.AppendBatch(origEvents)
require.EqualError(t, err, "cannot convert status to string: 0")
require.Equal(t, "", status)
p.AssertExpectations(t)
f.AssertExpectations(t)
})

t.Run("non-empty response buffer", func(t *testing.T) {
c, x, f, p, _ := prepare()

p.On("PrepareRequest", method, prepEvents).Return(request, nil).Once()
f.On("Write", request).Return(nil).Once()
x.On("Flush").Return(nil).Once()
f.On("Read").Return(response, nil).Once()
p.On("ParseResponse", method, response).Return("SOME", remaining, nil).Once()

status, err := c.AppendBatch(origEvents)
require.EqualError(t, err, "response buffer is not empty: len=2, rest=0x2A2B")
require.Equal(t, "", status)
p.AssertExpectations(t)
f.AssertExpectations(t)
})
}

func TestClient_Close(t *testing.T) {
Expand Down
4 changes: 2 additions & 2 deletions mocks/call_protocol.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ func (p *MockCallProtocol) PrepareRequest(method string, datum interface{}) ([]b
return args.Get(0).([]byte), args.Error(1)
}

func (p *MockCallProtocol) ParseResponse(method string, responseBytes []byte) (interface{}, []byte, error) {
func (p *MockCallProtocol) ParseResponse(method string, responseBytes []byte) (interface{}, error) {
args := p.Called(method, responseBytes)
return args.Get(0), args.Get(1).([]byte), args.Error(2)
return args.Get(0), args.Error(1)
}
Loading

0 comments on commit f335aaa

Please sign in to comment.