Skip to content

Commit

Permalink
Extract Flume client to a separate package
Browse files Browse the repository at this point in the history
refs #35

This is the first step to split the single client to independent Avro and Flume clients.
  • Loading branch information
vykulakov committed Feb 1, 2020
1 parent f4deb58 commit cc16073
Show file tree
Hide file tree
Showing 8 changed files with 249 additions and 149 deletions.
66 changes: 20 additions & 46 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,7 @@ import (
// An avro client implementation
type Client interface {
Close() error
Append(event *Event) (string, error)
AppendBatch(events []*Event) (string, error)
SendMessage(method string, datum interface{}) (string, error)
}

type client struct {
Expand All @@ -25,14 +24,6 @@ type client struct {
handshakeProtocol protocols.HandshakeProtocol
}

// NewClient creates an avro client with default option values and
// connects to the specified remote Flume endpoint immediately.
//
// Very useful for the testing purposes and to build simple examples.
func NewClient(addr string) (Client, error) {
return NewClientWithConfig(addr, NewConfig())
}

// NewClient creates an avro client with considering values of options from
// the passed configuration object and connects to the specified remote Flume
// endpoint immediately.
Expand Down Expand Up @@ -128,7 +119,25 @@ func (c *client) handshake() error {
return nil
}

func (c *client) sendMessage(method string, datum interface{}) (string, error) {
func (c *client) applyDeadline() error {
if c.sendTimeout > 0 {
d := time.Now().Add(c.sendTimeout)
return c.transport.SetDeadline(d)
}

return nil
}

func (c *client) Close() error {
err := c.applyDeadline()
if err != nil {
return err
}

return c.transport.Close()
}

func (c *client) SendMessage(method string, datum interface{}) (string, error) {
request, err := c.callProtocol.PrepareRequest(method, datum)
if err != nil {
return "", err
Expand All @@ -151,38 +160,3 @@ func (c *client) sendMessage(method string, datum interface{}) (string, error) {

return status, nil
}

func (c *client) applyDeadline() error {
if c.sendTimeout > 0 {
d := time.Now().Add(c.sendTimeout)
return c.transport.SetDeadline(d)
}

return nil
}

// Append sends event to flume
func (c *client) Append(event *Event) (string, error) {
datum := event.toMap()

return c.sendMessage("append", datum)
}

// Append sends events to flume
func (c *client) AppendBatch(events []*Event) (string, error) {
datum := make([]map[string]interface{}, 0)
for _, event := range events {
datum = append(datum, event.toMap())
}

return c.sendMessage("appendBatch", datum)
}

func (c *client) Close() error {
err := c.applyDeadline()
if err != nil {
return err
}

return c.transport.Close()
}
96 changes: 25 additions & 71 deletions client_internal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"testing"

"github.com/myzhan/avroipc/mocks"

"github.com/stretchr/testify/require"
)

Expand Down Expand Up @@ -53,6 +54,7 @@ func TestClient_handshake(t *testing.T) {
require.NoError(t, err)
h.AssertExpectations(t)
f.AssertExpectations(t)
x.AssertExpectations(t)
})

t.Run("preparing request failed", func(t *testing.T) {
Expand All @@ -70,116 +72,68 @@ func TestClient_handshake(t *testing.T) {
})
}

func TestClient_Append(t *testing.T) {
method := "append"

request := []byte{0x0A, 0x0B}
response := []byte{0x1A, 0x1B}

origEvent := &Event{Headers: map[string]string{}, Body: []byte("test body")}
prepEvent := origEvent.toMap()
func TestClient_Close(t *testing.T) {
testErr := errors.New("test error")

t.Run("succeed", func(t *testing.T) {
c, x, f, p, _ := prepare()
c, x, _, _, _ := prepare()

p.On("PrepareRequest", method, prepEvent).Return(request, nil)
f.On("Write", request).Return(nil)
x.On("Flush").Return(nil).Once()
f.On("Read").Return(response, nil)
p.On("ParseResponse", method, response).Return("SOME", nil)
x.On("Close").Return(nil).Once()

status, err := c.Append(origEvent)
err := c.Close()
require.NoError(t, err)
require.Equal(t, "SOME", status)
p.AssertExpectations(t)
f.AssertExpectations(t)
x.AssertExpectations(t)
})

t.Run("incorrect status type", func(t *testing.T) {
c, x, f, p, _ := prepare()
t.Run("framing layer error", func(t *testing.T) {
c, x, _, _, _ := prepare()

p.On("PrepareRequest", method, prepEvent).Return(request, nil).Once()
f.On("Write", request).Return(nil).Once()
x.On("Flush").Return(nil).Once()
f.On("Read").Return(response, nil).Once()
p.On("ParseResponse", method, response).Return(0, nil).Once()
x.On("Close").Return(testErr).Once()

status, err := c.Append(origEvent)
require.EqualError(t, err, "cannot convert status to string: 0")
require.Equal(t, "", status)
p.AssertExpectations(t)
f.AssertExpectations(t)
err := c.Close()
require.EqualError(t, err, "test error")
x.AssertExpectations(t)
})
}

func TestClient_AppendBatch(t *testing.T) {
method := "appendBatch"
func TestClient_SendMessage(t *testing.T) {
datum := "test data"
method := "append"

request := []byte{0x0A, 0x0B}
response := []byte{0x1A, 0x1B}

origEvents := []*Event{
{Headers: map[string]string{}, Body: []byte("test body 1")},
{Headers: map[string]string{}, Body: []byte("test body 2")},
}
prepEvents := []map[string]interface{}{
origEvents[0].toMap(),
origEvents[1].toMap(),
}

t.Run("succeed", func(t *testing.T) {
c, x, f, p, _ := prepare()

p.On("PrepareRequest", method, prepEvents).Return(request, nil)
f.On("Write", request).Return(nil)
p.On("PrepareRequest", method, datum).Return(request, nil).Once()
f.On("Write", request).Return(nil).Once()
x.On("Flush").Return(nil).Once()
f.On("Read").Return(response, nil)
p.On("ParseResponse", method, response).Return("SOME", nil)
f.On("Read").Return(response, nil).Once()
p.On("ParseResponse", method, response).Return("SOME", nil).Once()

status, err := c.AppendBatch(origEvents)
status, err := c.SendMessage(method, datum)
require.NoError(t, err)
require.Equal(t, "SOME", status)
p.AssertExpectations(t)
f.AssertExpectations(t)
x.AssertExpectations(t)
})

t.Run("incorrect status type", func(t *testing.T) {
c, x, f, p, _ := prepare()

p.On("PrepareRequest", method, prepEvents).Return(request, nil).Once()
p.On("PrepareRequest", method, datum).Return(request, nil).Once()
f.On("Write", request).Return(nil).Once()
x.On("Flush").Return(nil).Once()
f.On("Read").Return(response, nil).Once()
p.On("ParseResponse", method, response).Return(0, nil).Once()

status, err := c.AppendBatch(origEvents)
status, err := c.SendMessage(method, datum)
require.EqualError(t, err, "cannot convert status to string: 0")
require.Equal(t, "", status)
p.AssertExpectations(t)
f.AssertExpectations(t)
})
}

func TestClient_Close(t *testing.T) {
testErr := errors.New("test error")

t.Run("succeed", func(t *testing.T) {
c, x, _, _, _ := prepare()

x.On("Close").Return(nil)

err := c.Close()
require.NoError(t, err)
x.AssertExpectations(t)
})

t.Run("framing layer error", func(t *testing.T) {
c, x, _, _, _ := prepare()

x.On("Close").Return(testErr)

err := c.Close()
require.EqualError(t, err, "test error")
x.AssertExpectations(t)
})
}
59 changes: 59 additions & 0 deletions flume/client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
package flume

import (
"github.com/myzhan/avroipc"
)

// An avro client implementation
type Client interface {
Close() error
Append(event *Event) (string, error)
AppendBatch(events []*Event) (string, error)
}

type client struct {
client avroipc.Client
}

// NewClient creates an avro client with default option values and
// connects to the specified remote Flume endpoint immediately.
//
// Very useful for the testing purposes and to build simple examples.
func NewClient(addr string) (Client, error) {
return NewClientWithConfig(addr, avroipc.NewConfig())
}

// NewClient creates an avro client with considering values of options from
// the passed configuration object and connects to the specified remote Flume
// endpoint immediately.
//
// This constructor supposed to be used in production environments.
func NewClientWithConfig(addr string, config *avroipc.Config) (Client, error) {
c, err := avroipc.NewClientWithConfig(addr, config)
if err != nil {
return nil, err
}

return &client{c}, nil
}

// Append sends event to flume
func (c *client) Append(event *Event) (string, error) {
datum := event.toMap()

return c.client.SendMessage("append", datum)
}

// Append sends events to flume
func (c *client) AppendBatch(events []*Event) (string, error) {
datum := make([]map[string]interface{}, 0)
for _, event := range events {
datum = append(datum, event.toMap())
}

return c.client.SendMessage("appendBatch", datum)
}

func (c *client) Close() error {
return c.client.Close()
}
86 changes: 86 additions & 0 deletions flume/client_internal_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
package flume

import (
"errors"
"testing"

"github.com/stretchr/testify/require"

"github.com/myzhan/avroipc/flume/mocks"
)

func prepare() (*client, *mocks.MockClient) {
x := &mocks.MockClient{}

c := &client{
client: x,
}

return c, x
}

func TestClient_Close(t *testing.T) {
testErr := errors.New("test error")

t.Run("succeed", func(t *testing.T) {
c, x := prepare()

x.On("Close").Return(nil).Once()

err := c.Close()
require.NoError(t, err)
x.AssertExpectations(t)
})

t.Run("client error", func(t *testing.T) {
c, x := prepare()

x.On("Close").Return(testErr).Once()

err := c.Close()
require.EqualError(t, err, "test error")
x.AssertExpectations(t)
})
}

func TestClient_Append(t *testing.T) {
method := "append"

origEvent := &Event{Headers: map[string]string{}, Body: []byte("test body")}
prepEvent := origEvent.toMap()

t.Run("succeed", func(t *testing.T) {
c, x := prepare()

x.On("SendMessage", method, prepEvent).Return("SOME", nil).Once()

status, err := c.Append(origEvent)
require.NoError(t, err)
require.Equal(t, "SOME", status)
x.AssertExpectations(t)
})
}

func TestClient_AppendBatch(t *testing.T) {
method := "appendBatch"

origEvents := []*Event{
{Headers: map[string]string{}, Body: []byte("test body 1")},
{Headers: map[string]string{}, Body: []byte("test body 2")},
}
prepEvents := []map[string]interface{}{
origEvents[0].toMap(),
origEvents[1].toMap(),
}

t.Run("succeed", func(t *testing.T) {
c, x := prepare()

x.On("SendMessage", method, prepEvents).Return("SOME", nil).Once()

status, err := c.AppendBatch(origEvents)
require.NoError(t, err)
require.Equal(t, "SOME", status)
x.AssertExpectations(t)
})
}
Loading

0 comments on commit cc16073

Please sign in to comment.