Skip to content

Commit

Permalink
Add buffered transport, close #13 (#14)
Browse files Browse the repository at this point in the history
  • Loading branch information
vykulakov committed Dec 22, 2019
1 parent 99c4b9f commit c5e86f6
Show file tree
Hide file tree
Showing 7 changed files with 332 additions and 22 deletions.
36 changes: 28 additions & 8 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,16 +12,36 @@ Thanks to Linkedin's [goavro](https://github.com/linkedin/goavro)!
## Usage

```go
// flume avro instance address
client := NewClient("localhost:20200")
package main

headersMap := make(map[string]string)
headersMap["topic"] = "myzhan"
headersMap["timestamp"] = "1508740315478"
body := []byte("hello from go")
import (
"log"

event := NewEvent(headersMap, body)
client.Append(event)
"github.com/myzhan/avroipc"
)

func main() {
// flume avro instance address
client, err := avroipc.NewClient("localhost:20200", 1024, 6)
if err != nil {
log.Fatal(err)
}

event := &avroipc.Event{
Body: []byte("hello from go"),
Headers: map[string]string {
"topic": "myzhan",
"timestamp": "1508740315478",
},
}
status, err := client.Append(event)
if err != nil {
log.Fatal(err)
}
if status != "OK" {
log.Fatalf("Bad status: %s", status)
}
}
```

## Development
Expand Down
43 changes: 43 additions & 0 deletions buffered_transport.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
package avroipc

import (
"bufio"
)

type bufferedTransport struct {
r *bufio.Reader
w *bufio.Writer
trans Transport
}

func NewBufferedTransport(trans Transport, bufferSize int) Transport {
return &bufferedTransport{
r: bufio.NewReaderSize(trans, bufferSize),
w: bufio.NewWriterSize(trans, bufferSize),
trans: trans,
}
}

func (p *bufferedTransport) Open() (err error) {
return p.trans.Open()
}

func (p *bufferedTransport) Close() (err error) {
return p.trans.Close()
}

func (p *bufferedTransport) Read(b []byte) (int, error) {
return p.r.Read(b)
}

func (p *bufferedTransport) Write(b []byte) (int, error) {
return p.w.Write(b)
}

func (p *bufferedTransport) Flush() error {
err := p.w.Flush()
if err != nil {
return err
}
return p.trans.Flush()
}
242 changes: 242 additions & 0 deletions buffered_transport_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,242 @@
package avroipc_test

import (
"fmt"
"github.com/myzhan/avroipc"
"github.com/myzhan/avroipc/mocks"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"
"testing"
)

func prepareBufferedTransport() (avroipc.Transport, *mocks.MockTransport) {
m := &mocks.MockTransport{}
b := avroipc.NewBufferedTransport(m, 8)

return b, m
}

func TestBufferedTransport_Open(t *testing.T) {
t.Run("succeed", func(t *testing.T) {
b, m := prepareBufferedTransport()

m.On("Open").Return(nil).Once()

err := b.Open()
require.NoError(t, err)
m.AssertExpectations(t)
})

t.Run("failed", func(t *testing.T) {
b, m := prepareBufferedTransport()

m.On("Open").Return(fmt.Errorf("test error")).Once()

err := b.Open()
require.EqualError(t, err, "test error")
m.AssertExpectations(t)
})
}

func TestBufferedTransport_Close(t *testing.T) {
t.Run("succeed", func(t *testing.T) {
b, m := prepareBufferedTransport()

m.On("Close").Return(nil).Once()

err := b.Close()
require.NoError(t, err)
m.AssertExpectations(t)
})

t.Run("failed", func(t *testing.T) {
b, m := prepareBufferedTransport()

m.On("Close").Return(fmt.Errorf("test error")).Once()

err := b.Close()
require.EqualError(t, err, "test error")
m.AssertExpectations(t)
})
}

func TestBufferedTransport_Read(t *testing.T) {
t.Run("few bytes", func(t *testing.T) {
d := []byte{0x1, 0x2, 0x3, 0x4}
x := []byte{0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0}
b, m := prepareBufferedTransport()

m.On("Read", x).Return(4, nil).Once().Run(func(args mock.Arguments) {
x := args[0].([]byte)
n := copy(x, d)
require.Equal(t, 4, n)
})

a := []byte{0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0}
e := []byte{0x1, 0x2, 0x3, 0x4, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0}
n, err := b.Read(a)
require.NoError(t, err)
require.Equal(t, 4, n)
require.Equal(t, e, a)
m.AssertExpectations(t)
})

t.Run("many bytes", func(t *testing.T) {
d := []byte{0x1, 0x2, 0x3, 0x4, 0x5, 0x6, 0x7, 0x8, 0x9, 0xa, 0xb, 0xc, 0xd, 0xe, 0xf, 0xf}
x := []byte{0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0}
b, m := prepareBufferedTransport()

m.On("Read", x).Return(16, nil).Once().Run(func(args mock.Arguments) {
x := args[0].([]byte)
n := copy(x, d)
require.Equal(t, 16, n)
})

a := []byte{0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0}
e := []byte{0x1, 0x2, 0x3, 0x4, 0x5, 0x6, 0x7, 0x8, 0x9, 0xa}
n, err := b.Read(a)
require.NoError(t, err)
require.Equal(t, 10, n)
require.Equal(t, e, a)
m.AssertExpectations(t)
})

t.Run("transport error", func(t *testing.T) {
d := []byte{0x1, 0x2, 0x3, 0x4}
x := []byte{0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0}
b, m := prepareBufferedTransport()

m.On("Read", x).Return(4, fmt.Errorf("test error")).Once().Run(func(args mock.Arguments) {
x := args[0].([]byte)
n := copy(x, d)
require.Equal(t, 4, n)
})

a := []byte{0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0}
e := []byte{0x1, 0x2, 0x3, 0x4, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0}
n, err := b.Read(a)
require.NoError(t, err)
require.Equal(t, 4, n)
require.Equal(t, e, a)

n, err = b.Read(a)
require.EqualError(t, err, "test error")
require.Equal(t, 0, n)
require.Equal(t, e, a)
m.AssertExpectations(t)
})
}

func TestBufferedTransport_Write(t *testing.T) {
t.Run("few bytes", func(t *testing.T) {
d := []byte{0x1, 0x2, 0x3, 0x4}
b, m := prepareBufferedTransport()

n, err := b.Write(d)
require.NoError(t, err)
require.Equal(t, 4, n)
m.AssertExpectations(t)
})

t.Run("many bytes", func(t *testing.T) {
d := []byte{0x1, 0x2, 0x3, 0x4, 0x5, 0x6, 0x7, 0x8, 0x9, 0xa}
b, m := prepareBufferedTransport()

m.On("Write", d).Return(4, nil).Once()

n, err := b.Write(d)
require.NoError(t, err)
require.Equal(t, 10, n)
m.AssertExpectations(t)
})

t.Run("some times by few bytes with flush", func(t *testing.T) {
d := []byte{0x1, 0x2, 0x3, 0x4, 0x5}
e1 := []byte{0x1, 0x2, 0x3, 0x4, 0x5, 0x1, 0x2, 0x3}
e2 := []byte{0x4, 0x5}
b, m := prepareBufferedTransport()

m.On("Write", e1).Return(8, nil).Once()
m.On("Write", e2).Return(2, nil).Once()
m.On("Flush").Return(nil).Once()

for i := 0; i < 2; i++ {
n, err := b.Write(d)
require.NoError(t, err)
require.Equal(t, 5, n)
}
err := b.Flush()
require.NoError(t, err)
m.AssertExpectations(t)
})

t.Run("transport error", func(t *testing.T) {
d := []byte{0x1, 0x2, 0x3, 0x4, 0x5}
e := []byte{0x1, 0x2, 0x3, 0x4, 0x5, 0x1, 0x2, 0x3}
b, m := prepareBufferedTransport()

m.On("Write", e).Return(10, fmt.Errorf("test error")).Once()

n, err := b.Write(d)
require.NoError(t, err)
require.Equal(t, 5, n)

n, err = b.Write(d)
require.EqualError(t, err, "test error")
require.Equal(t, 3, n)
m.AssertExpectations(t)
})
}

func TestBufferedTransport_Flush(t *testing.T) {
t.Run("empty buffer", func(t *testing.T) {
b, m := prepareBufferedTransport()

m.On("Flush").Return(nil).Once()

err := b.Flush()
require.NoError(t, err)
m.AssertExpectations(t)
})

t.Run("non-empty buffer", func(t *testing.T) {
d := []byte{0x1, 0x2, 0x3, 0x4}
b, m := prepareBufferedTransport()

m.On("Write", d).Return(4, nil).Once()
m.On("Flush").Return(nil).Once()

n, err := b.Write(d)
require.NoError(t, err)
require.Equal(t, 4, n)

err = b.Flush()
require.NoError(t, err)
m.AssertExpectations(t)
})

t.Run("buffer error", func(t *testing.T) {
d := []byte{0x1, 0x2, 0x3, 0x4}
b, m := prepareBufferedTransport()

m.On("Write", d).Return(4, fmt.Errorf("test error")).Once()

n, err := b.Write(d)
require.NoError(t, err)
require.Equal(t, 4, n)

err = b.Flush()
require.EqualError(t, err, "test error")
m.AssertExpectations(t)
})

t.Run("transport error", func(t *testing.T) {
b, m := prepareBufferedTransport()

m.On("Flush").Return(fmt.Errorf("test error")).Once()

err := b.Flush()
require.EqualError(t, err, "test error")
m.AssertExpectations(t)
})
}
5 changes: 4 additions & 1 deletion client.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,14 @@ type client struct {
}

// NewClient creates an avro Client, and connect to addr immediately
func NewClient(addr string, compressionLevel int) (Client, error) {
func NewClient(addr string, bufferSize, compressionLevel int) (Client, error) {
trans, err := NewSocket(addr)
if err != nil {
return nil, err
}
if bufferSize > 0 {
trans = NewBufferedTransport(trans, bufferSize)
}
if compressionLevel > 0 {
trans, err = NewZlibTransport(trans, compressionLevel)
if err != nil {
Expand Down
6 changes: 3 additions & 3 deletions client_internal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ func TestClient_Append(t *testing.T) {
response := []byte{0x1A, 0x1B}
remaining := []byte{0x2A, 0x2B}

origEvent := &Event{headers: map[string]string{}, body: []byte("test body")}
origEvent := &Event{Headers: map[string]string{}, Body: []byte("test body")}
prepEvent := origEvent.toMap()

t.Run("succeed", func(t *testing.T) {
Expand Down Expand Up @@ -137,8 +137,8 @@ func TestClient_AppendBatch(t *testing.T) {
remaining := []byte{0x2A, 0x2B}

origEvents := []*Event{
{headers: map[string]string{}, body: []byte("test body 1")},
{headers: map[string]string{}, body: []byte("test body 2")},
{Headers: map[string]string{}, Body: []byte("test body 1")},
{Headers: map[string]string{}, Body: []byte("test body 2")},
}
prepEvents := []map[string]interface{}{
origEvents[0].toMap(),
Expand Down
8 changes: 4 additions & 4 deletions event.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,14 @@ package avroipc

// Event acts as an avro event
type Event struct {
headers map[string]string
body []byte
Headers map[string]string
Body []byte
}

func (e *Event) toMap() map[string]interface{} {
m := make(map[string]interface{})
m["headers"] = e.headers
m["body"] = e.body
m["headers"] = e.Headers
m["body"] = e.Body

return m
}
Loading

0 comments on commit c5e86f6

Please sign in to comment.