Skip to content

Commit

Permalink
Refactor client (#7)
Browse files Browse the repository at this point in the history
What was done in this commit:
* Tests for the client were added.
* Tests for the real client were extended a little bit.
* Client interface was added to simplify usage of the client.
* Client logging was removed because it is not really useful.
* New client constructor was added to support more transports in the future.
* The client close method was added because an underlying transport already supports closing.
  • Loading branch information
vykulakov committed Dec 19, 2019
1 parent c5deed0 commit dacb799
Show file tree
Hide file tree
Showing 7 changed files with 269 additions and 69 deletions.
100 changes: 45 additions & 55 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,108 +2,94 @@ package avroipc

import (
"fmt"
"github.com/sirupsen/logrus"
)

// Client acts as an avro client
type Client struct {
serial int64
logger *logrus.Entry
type Client interface {
Close() error
Append(event *Event) (string, error)
}

connection Transport
type client struct {
framingLayer FramingLayer
callProtocol CallProtocol
handshakeProtocol HandshakeProtocol
}

// NewClient creates an avro Client, and connect to addr immediately
func NewClient(addr string) (client *Client, err error) {
client = &Client{}

client.logger = logrus.WithFields(logrus.Fields{
"name": "AvroFlumeClient",
})
client.logger.Debug("created")

client.connection, err = NewSocket(addr)
func NewClient(addr string) (Client, error) {
trans, err := NewSocket(addr)
if err != nil {
return nil, err
}

client.framingLayer = NewFramingLayer(client.connection)

proto, err := NewAvroSourceProtocol()
if err != nil {
return nil, err
}
client.callProtocol, err = NewCallProtocol(proto)
err = trans.Open()
if err != nil {
return nil, err
}

client.handshakeProtocol, err = NewHandshakeProtocol()
proto, err := NewAvroSourceProtocol()
if err != nil {
return nil, err
}

err = client.connect()
return NewClientWithTrans(trans, proto)
}

func NewClientWithTrans(trans Transport, proto MessageProtocol) (Client, error) {
c := &client{}
var err error

c.framingLayer = NewFramingLayer(trans)

c.callProtocol, err = NewCallProtocol(proto)
if err != nil {
return nil, err
}

return client, nil
}

func (client *Client) connect() (err error) {
err = client.connection.Open()
c.handshakeProtocol, err = NewHandshakeProtocol()
if err != nil {
return err
return nil, err
}

// first connect, need handshake
err = client.handshake()
err = c.handshake()
if err != nil {
return err
return nil, err
}

return nil
return c, nil
}

func (client *Client) send(request []byte) ([]byte, error) {
err := client.framingLayer.Write(request)
func (c *client) send(request []byte) ([]byte, error) {
err := c.framingLayer.Write(request)
if err != nil {
return nil, err
}
client.logger.WithField("size", len(request)).Debug("sent request")

response, err := client.framingLayer.Read()
response, err := c.framingLayer.Read()
if err != nil {
return nil, err
}
client.logger.WithField("size", len(response)).Debug("read response")

return response, nil
}

func (client *Client) handshake() error {
client.logger.Debug("start handshake")

request, err := client.handshakeProtocol.PrepareRequest()
func (c *client) handshake() error {
request, err := c.handshakeProtocol.PrepareRequest()
if err != nil {
return err
}

responseBytes, err := client.send(request)
responseBytes, err := c.send(request)
if err != nil {
return err
}

needResend, err := client.handshakeProtocol.ProcessResponse(responseBytes)
needResend, err := c.handshakeProtocol.ProcessResponse(responseBytes)
if err != nil {
return err
}
if needResend {
err = client.handshake()
err = c.handshake()
if err != nil {
return err
}
Expand All @@ -113,29 +99,29 @@ func (client *Client) handshake() error {
}

// Append sends event to flume
func (client *Client) Append(event *Event) (string, error) {
func (c *client) Append(event *Event) (string, error) {
datum := event.toMap()
method := "append"

request, err := client.callProtocol.PrepareRequest(method, datum)
request, err := c.callProtocol.PrepareRequest(method, datum)
if err != nil {
return "", err
}

responseBytes, err := client.send(request)
responseBytes, err := c.send(request)
if err != nil {
return "", err
}

response, responseBytes, err := client.callProtocol.ParseResponse(method, responseBytes)
response, responseBytes, err := c.callProtocol.ParseResponse(method, responseBytes)
if err != nil {
return "", err
}
if len(responseBytes) > 0 {
client.logger.WithFields(logrus.Fields{
"length": len(responseBytes),
"buffer": responseBytes,
}).Errorf("response buffer is not empty")

r := responseBytes
n := len(responseBytes)
if n > 0 {
return "", fmt.Errorf("response buffer is not empty: len=%d, rest=0x%X", n, r)
}

status, ok := response.(string)
Expand All @@ -145,3 +131,7 @@ func (client *Client) Append(event *Event) (string, error) {

return status, nil
}

func (c *client) Close() error {
return c.framingLayer.Close()
}
146 changes: 146 additions & 0 deletions client_internal_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,146 @@
package avroipc

import (
"errors"
"github.com/myzhan/avroipc/mocks"
"github.com/stretchr/testify/require"
"testing"
)

func prepare() (*client, *mocks.MockFramingLayer, *mocks.MockCallProtocol, *mocks.MockHandshakeProtocol) {
f := &mocks.MockFramingLayer{}
p := &mocks.MockCallProtocol{}
h := &mocks.MockHandshakeProtocol{}

c := &client{
framingLayer: f,
callProtocol: p,
handshakeProtocol: h,
}

return c, f, p, h
}

func TestClient_handshake(t *testing.T) {
testErr := errors.New("test error")

t.Run("succeed", func(t *testing.T) {
c, f, _, h := prepare()

request1 := []byte{0x0A, 0x0B}
request2 := []byte{0x1A, 0x1B}
response1 := []byte{0x2A, 0x2B}
response2 := []byte{0x3A, 0x3B}

// The first handshake request: emulate an unknown client protocol
h.On("PrepareRequest").Return(request1, nil).Once()
f.On("Write", request1).Return(nil).Once()
f.On("Read").Return(response1, nil).Once()
h.On("ProcessResponse", response1).Return(true, nil).Once()

// The second handshake request: the server already knows the client protocol
h.On("PrepareRequest").Return(request2, nil).Once()
f.On("Write", request2).Return(nil).Once()
f.On("Read").Return(response2, nil).Once()
h.On("ProcessResponse", response2).Return(false, nil).Once()

err := c.handshake()
require.NoError(t, err)
h.AssertExpectations(t)
f.AssertExpectations(t)
})

t.Run("preparing request failed", func(t *testing.T) {
c, f, _, h := prepare()

request := []byte{}

// The first handshake request: emulate an unknown client protocol
h.On("PrepareRequest").Return(request, testErr).Once()

err := c.handshake()
require.EqualError(t, err, "test error")
h.AssertExpectations(t)
f.AssertExpectations(t)
})
}

func TestClient_Append(t *testing.T) {
method := "append"

request := []byte{0x0A, 0x0B}
response := []byte{0x1A, 0x1B}
remaining := []byte{0x2A, 0x2B}

origEvent := &Event{headers: map[string]string{}, body: []byte("test body")}
prepEvent := origEvent.toMap()

t.Run("succeed", func(t *testing.T) {
c, f, p, _ := prepare()

p.On("PrepareRequest", method, prepEvent).Return(request, nil)
f.On("Write", request).Return(nil)
f.On("Read").Return(response, nil)
p.On("ParseResponse", method, response).Return("SOME", []byte{}, nil)

status, err := c.Append(origEvent)
require.NoError(t, err)
require.Equal(t, "SOME", status)
p.AssertExpectations(t)
f.AssertExpectations(t)
})

t.Run("incorrect status type", func(t *testing.T) {
c, f, p, _ := prepare()

p.On("PrepareRequest", method, prepEvent).Return(request, nil).Once()
f.On("Write", request).Return(nil).Once()
f.On("Read").Return(response, nil).Once()
p.On("ParseResponse", method, response).Return(0, []byte{}, nil).Once()

status, err := c.Append(origEvent)
require.EqualError(t, err, "cannot convert status to string: 0")
require.Equal(t, "", status)
p.AssertExpectations(t)
f.AssertExpectations(t)
})

t.Run("non-empty response buffer", func(t *testing.T) {
c, f, p, _ := prepare()

p.On("PrepareRequest", method, prepEvent).Return(request, nil).Once()
f.On("Write", request).Return(nil).Once()
f.On("Read").Return(response, nil).Once()
p.On("ParseResponse", method, response).Return("SOME", remaining, nil).Once()

status, err := c.Append(origEvent)
require.EqualError(t, err, "response buffer is not empty: len=2, rest=0x2A2B")
require.Equal(t, "", status)
p.AssertExpectations(t)
f.AssertExpectations(t)
})
}

func TestClient_Close(t *testing.T) {
testErr := errors.New("test error")

t.Run("succeed", func(t *testing.T) {
c, f, _, _ := prepare()

f.On("Close").Return(nil)

err := c.Close()
require.NoError(t, err)
f.AssertExpectations(t)
})

t.Run("framing layer error", func(t *testing.T) {
c, f, _, _ := prepare()

f.On("Close").Return(testErr)

err := c.Close()
require.EqualError(t, err, "test error")
f.AssertExpectations(t)
})
}
8 changes: 0 additions & 8 deletions event.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,6 @@ type Event struct {
body []byte
}

// NewEvent creates an Event, which can be sent to flume
func NewEvent(headers map[string]string, body []byte) *Event {
return &Event{
headers: headers,
body: body,
}
}

func (e *Event) toMap() map[string]interface{} {
m := make(map[string]interface{})
m["headers"] = e.headers
Expand Down
24 changes: 18 additions & 6 deletions ipc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,26 @@ func TestSend(t *testing.T) {
client, err := NewClient(addr)
require.NoError(t, err)

headersMap := make(map[string]string)
headersMap["topic"] = "myzhan"
headersMap["timestamp"] = "1508740315478"
body := []byte("hello from go")
event := &Event{
body: []byte("hello from go"),
headers: map[string]string{
"topic": "myzhan",
"timestamp": "1508740315478",
},
}

event := NewEvent(headersMap, body)
var status string

status, err := client.Append(event)
// The first append call.
status, err = client.Append(event)
require.NoError(t, err)
require.Equal(t, "OK", status)

// The second append call.
status, err = client.Append(event)
require.NoError(t, err)
require.Equal(t, "OK", status)

// Close the client finally.
require.NoError(t, client.Close())
}
Loading

0 comments on commit dacb799

Please sign in to comment.