Skip to content

Commit

Permalink
Add a compression transport, close #11 (#12)
Browse files Browse the repository at this point in the history
  • Loading branch information
vykulakov committed Dec 21, 2019
1 parent a9d0f7f commit 99c4b9f
Show file tree
Hide file tree
Showing 10 changed files with 256 additions and 21 deletions.
9 changes: 8 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -32,12 +32,19 @@ go get
go test ./...
```

To run test with a real client run the following command:
To run a test with a real client run the following command:
```bash
FLUME_SERVER_ADDRESS=127.0.0.1:20201 go test -count=1 -run TestSend
```
where `127.0.0.1:20201` is a real Apache Flume server, `-count=1` is a way to disable Go build cache.

If you want to run a test with a real client and enabled data compression run the following command:
```bash
FLUME_SERVER_ADDRESS=127.0.0.1:20201 FLUME_COMPRESSION_LEVEL=1 go test -count=1 -run TestSend
```
where `FLUME_COMPRESSION_LEVEL` is a new environment variable to specify wanted compression level.
Support values from `1` to `9`.

## License

Open source licensed under the MIT license (see _LICENSE_ file for details).
15 changes: 14 additions & 1 deletion client.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,17 +12,24 @@ type Client interface {
}

type client struct {
transport Transport
framingLayer FramingLayer
callProtocol CallProtocol
handshakeProtocol HandshakeProtocol
}

// NewClient creates an avro Client, and connect to addr immediately
func NewClient(addr string) (Client, error) {
func NewClient(addr string, compressionLevel int) (Client, error) {
trans, err := NewSocket(addr)
if err != nil {
return nil, err
}
if compressionLevel > 0 {
trans, err = NewZlibTransport(trans, compressionLevel)
if err != nil {
return nil, err
}
}
err = trans.Open()
if err != nil {
return nil, err
Expand All @@ -40,6 +47,7 @@ func NewClientWithTrans(trans Transport, proto MessageProtocol) (Client, error)
c := &client{}
var err error

c.transport = trans
c.framingLayer = NewFramingLayer(trans)

c.callProtocol, err = NewCallProtocol(proto)
Expand All @@ -66,6 +74,11 @@ func (c *client) send(request []byte) ([]byte, error) {
return nil, err
}

err = c.transport.Flush()
if err != nil {
return nil, err
}

response, err := c.framingLayer.Read()
if err != nil {
return nil, err
Expand Down
34 changes: 22 additions & 12 deletions client_internal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,25 +8,27 @@ import (
"github.com/stretchr/testify/require"
)

func prepare() (*client, *mocks.MockFramingLayer, *mocks.MockCallProtocol, *mocks.MockHandshakeProtocol) {
func prepare() (*client, *mocks.MockTransport, *mocks.MockFramingLayer, *mocks.MockCallProtocol, *mocks.MockHandshakeProtocol) {
t := &mocks.MockTransport{}
f := &mocks.MockFramingLayer{}
p := &mocks.MockCallProtocol{}
h := &mocks.MockHandshakeProtocol{}

c := &client{
transport: t,
framingLayer: f,
callProtocol: p,
handshakeProtocol: h,
}

return c, f, p, h
return c, t, f, p, h
}

func TestClient_handshake(t *testing.T) {
testErr := errors.New("test error")

t.Run("succeed", func(t *testing.T) {
c, f, _, h := prepare()
c, x, f, _, h := prepare()

request1 := []byte{0x0A, 0x0B}
request2 := []byte{0x1A, 0x1B}
Expand All @@ -36,12 +38,14 @@ func TestClient_handshake(t *testing.T) {
// The first handshake request: emulate an unknown client protocol
h.On("PrepareRequest").Return(request1, nil).Once()
f.On("Write", request1).Return(nil).Once()
x.On("Flush").Return(nil).Once()
f.On("Read").Return(response1, nil).Once()
h.On("ProcessResponse", response1).Return(true, nil).Once()

// The second handshake request: the server already knows the client protocol
h.On("PrepareRequest").Return(request2, nil).Once()
f.On("Write", request2).Return(nil).Once()
x.On("Flush").Return(nil).Once()
f.On("Read").Return(response2, nil).Once()
h.On("ProcessResponse", response2).Return(false, nil).Once()

Expand All @@ -52,7 +56,7 @@ func TestClient_handshake(t *testing.T) {
})

t.Run("preparing request failed", func(t *testing.T) {
c, f, _, h := prepare()
c, _, f, _, h := prepare()

request := []byte{}

Expand All @@ -77,10 +81,11 @@ func TestClient_Append(t *testing.T) {
prepEvent := origEvent.toMap()

t.Run("succeed", func(t *testing.T) {
c, f, p, _ := prepare()
c, x, f, p, _ := prepare()

p.On("PrepareRequest", method, prepEvent).Return(request, nil)
f.On("Write", request).Return(nil)
x.On("Flush").Return(nil).Once()
f.On("Read").Return(response, nil)
p.On("ParseResponse", method, response).Return("SOME", []byte{}, nil)

Expand All @@ -92,10 +97,11 @@ func TestClient_Append(t *testing.T) {
})

t.Run("incorrect status type", func(t *testing.T) {
c, f, p, _ := prepare()
c, x, f, p, _ := prepare()

p.On("PrepareRequest", method, prepEvent).Return(request, nil).Once()
f.On("Write", request).Return(nil).Once()
x.On("Flush").Return(nil).Once()
f.On("Read").Return(response, nil).Once()
p.On("ParseResponse", method, response).Return(0, []byte{}, nil).Once()

Expand All @@ -107,10 +113,11 @@ func TestClient_Append(t *testing.T) {
})

t.Run("non-empty response buffer", func(t *testing.T) {
c, f, p, _ := prepare()
c, x, f, p, _ := prepare()

p.On("PrepareRequest", method, prepEvent).Return(request, nil).Once()
f.On("Write", request).Return(nil).Once()
x.On("Flush").Return(nil).Once()
f.On("Read").Return(response, nil).Once()
p.On("ParseResponse", method, response).Return("SOME", remaining, nil).Once()

Expand Down Expand Up @@ -139,10 +146,11 @@ func TestClient_AppendBatch(t *testing.T) {
}

t.Run("succeed", func(t *testing.T) {
c, f, p, _ := prepare()
c, x, f, p, _ := prepare()

p.On("PrepareRequest", method, prepEvents).Return(request, nil)
f.On("Write", request).Return(nil)
x.On("Flush").Return(nil).Once()
f.On("Read").Return(response, nil)
p.On("ParseResponse", method, response).Return("SOME", []byte{}, nil)

Expand All @@ -154,10 +162,11 @@ func TestClient_AppendBatch(t *testing.T) {
})

t.Run("incorrect status type", func(t *testing.T) {
c, f, p, _ := prepare()
c, x, f, p, _ := prepare()

p.On("PrepareRequest", method, prepEvents).Return(request, nil).Once()
f.On("Write", request).Return(nil).Once()
x.On("Flush").Return(nil).Once()
f.On("Read").Return(response, nil).Once()
p.On("ParseResponse", method, response).Return(0, []byte{}, nil).Once()

Expand All @@ -169,10 +178,11 @@ func TestClient_AppendBatch(t *testing.T) {
})

t.Run("non-empty response buffer", func(t *testing.T) {
c, f, p, _ := prepare()
c, x, f, p, _ := prepare()

p.On("PrepareRequest", method, prepEvents).Return(request, nil).Once()
f.On("Write", request).Return(nil).Once()
x.On("Flush").Return(nil).Once()
f.On("Read").Return(response, nil).Once()
p.On("ParseResponse", method, response).Return("SOME", remaining, nil).Once()

Expand All @@ -188,7 +198,7 @@ func TestClient_Close(t *testing.T) {
testErr := errors.New("test error")

t.Run("succeed", func(t *testing.T) {
c, f, _, _ := prepare()
c, _, f, _, _ := prepare()

f.On("Close").Return(nil)

Expand All @@ -198,7 +208,7 @@ func TestClient_Close(t *testing.T) {
})

t.Run("framing layer error", func(t *testing.T) {
c, f, _, _ := prepare()
c, _, f, _, _ := prepare()

f.On("Close").Return(testErr)

Expand Down
11 changes: 10 additions & 1 deletion ipc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package avroipc
import (
"log"
"os"
"strconv"
"testing"

"github.com/stretchr/testify/require"
Expand All @@ -16,8 +17,16 @@ func TestSend(t *testing.T) {
t.Skip("The FLUME_SERVER_ADDRESS environment variable is not set")
}

level := os.Getenv("FLUME_COMPRESSION_LEVEL")
levelInt := 0
if level != "" {
var err error
levelInt, err = strconv.Atoi(level)
require.NoError(t, err)
}

// flume avro instance address
client, err := NewClient(addr)
client, err := NewClient(addr, levelInt)
require.NoError(t, err)

event := &Event{
Expand Down
17 changes: 11 additions & 6 deletions mocks/transport.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,16 @@ type MockTransport struct {
mock.Mock
}

func (t *MockTransport) Open() error {
args := t.Called()
return args.Error(0)
}

func (t *MockTransport) Close() error {
args := t.Called()
return args.Error(0)
}

func (t *MockTransport) Read(p []byte) (n int, err error) {
args := t.Called(p)
return args.Int(0), args.Error(1)
Expand All @@ -18,12 +28,7 @@ func (t *MockTransport) Write(p []byte) (n int, err error) {
return args.Int(0), args.Error(1)
}

func (t *MockTransport) Close() error {
args := t.Called()
return args.Error(0)
}

func (t *MockTransport) Open() error {
func (t *MockTransport) Flush() error {
args := t.Called()
return args.Error(0)
}
4 changes: 4 additions & 0 deletions socket.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,3 +71,7 @@ func (s *socket) Write(buf []byte) (int, error) {

return s.conn.Write(buf)
}

func (s *socket) Flush() error {
return nil
}
8 changes: 8 additions & 0 deletions socket_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,14 @@ func TestSocket(t *testing.T) {
require.NoError(t, trans.Close())
})

t.Run("flush", func(t *testing.T) {
trans, err := avroipc.NewSocket("")
require.NoError(t, err)

err = trans.Flush()
require.NoError(t, err)
})

t.Run("success", func(t *testing.T) {
addr, clean := runServer(t)

Expand Down
1 change: 1 addition & 0 deletions transport.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,4 +8,5 @@ type Transport interface {
io.ReadWriteCloser

Open() error
Flush() error
}
71 changes: 71 additions & 0 deletions zlib_transport.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
package avroipc

import (
"compress/zlib"
"io"
)

type zlibTransport struct {
r io.ReadCloser
w *zlib.Writer
trans Transport
}

func NewZlibTransport(trans Transport, level int) (Transport, error) {
w, err := zlib.NewWriterLevel(trans, level)
if err != nil {
return nil, err
}

return &zlibTransport{
w: w,
trans: trans,
}, nil
}

func (t *zlibTransport) Open() error {
return t.trans.Open()
}

func (t *zlibTransport) Close() error {
if t.r != nil {
err := t.r.Close()
if err != nil {
return err
}
}

err := t.w.Close()
if err != nil {
return err
}

return t.trans.Close()
}

func (t *zlibTransport) Read(p []byte) (int, error) {
// Use lazy initialization of a reader because it immediately starts reading a header
// so may hang if there is no data in the underlying transport
if t.r == nil {
r, err := zlib.NewReader(t.trans)
if err != nil {
return 0, err
}
t.r = r
}

return t.r.Read(p)
}

func (t *zlibTransport) Write(p []byte) (int, error) {
return t.w.Write(p)
}

func (t *zlibTransport) Flush() error {
err := t.w.Flush()
if err != nil {
return err
}

return t.trans.Flush()
}
Loading

0 comments on commit 99c4b9f

Please sign in to comment.