Skip to content

Commit

Permalink
Extract transports into a separated package (#25)
Browse files Browse the repository at this point in the history
Now the whole code of the project is placed in a single package. But it contains the code to solve different tasks like transferring bytes in different ways (over sockets, buffered transfer, transfer compressed bytes and so on), implementing different protocols between services (avro message protocol, call protocol) and finally providing a completed client to join together all those stuff. As a result, it may be hard to explore the code and to understand where any part of the whole client is placed. So having different packages for the code to solve different kind of task looks sane in this case.
  • Loading branch information
vykulakov committed Dec 29, 2019
1 parent 8f359da commit 440c6f7
Show file tree
Hide file tree
Showing 9 changed files with 44 additions and 37 deletions.
12 changes: 7 additions & 5 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ package avroipc
import (
"fmt"
"time"

"github.com/myzhan/avroipc/transports"
)

// Client acts as an avro client
Expand All @@ -15,23 +17,23 @@ type Client interface {
type client struct {
sendTimeout time.Duration

transport Transport
transport transports.Transport
framingLayer FramingLayer
callProtocol CallProtocol
handshakeProtocol HandshakeProtocol
}

// NewClient creates an avro Client, and connect to addr immediately
func NewClient(addr string, timeout, sendTimeout time.Duration, bufferSize, compressionLevel int) (Client, error) {
trans, err := NewSocketTimeout(addr, timeout)
trans, err := transports.NewSocketTimeout(addr, timeout)
if err != nil {
return nil, err
}
if bufferSize > 0 {
trans = NewBufferedTransport(trans, bufferSize)
trans = transports.NewBuffered(trans, bufferSize)
}
if compressionLevel > 0 {
trans, err = NewZlibTransport(trans, compressionLevel)
trans, err = transports.NewZlib(trans, compressionLevel)
if err != nil {
return nil, err
}
Expand All @@ -49,7 +51,7 @@ func NewClient(addr string, timeout, sendTimeout time.Duration, bufferSize, comp
return NewClientWithTrans(trans, proto, sendTimeout)
}

func NewClientWithTrans(trans Transport, proto MessageProtocol, sendTimeout time.Duration) (Client, error) {
func NewClientWithTrans(trans transports.Transport, proto MessageProtocol, sendTimeout time.Duration) (Client, error) {
var err error
c := &client{}
c.sendTimeout = sendTimeout
Expand Down
6 changes: 4 additions & 2 deletions framing.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ import (
"encoding/binary"
"fmt"
"io"

"github.com/myzhan/avroipc/transports"
)

const maxFrameSize = 10 * 1024
Expand All @@ -19,12 +21,12 @@ type FramingLayer interface {
type framingLayer struct {
rb bytes.Buffer

trans Transport
trans transports.Transport

serial uint32
}

func NewFramingLayer(trans Transport) FramingLayer {
func NewFramingLayer(trans transports.Transport) FramingLayer {
return &framingLayer{
trans: trans,
}
Expand Down
4 changes: 2 additions & 2 deletions buffered_transport.go → transports/buffered.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package avroipc
package transports

import (
"bufio"
Expand All @@ -11,7 +11,7 @@ type bufferedTransport struct {
trans Transport
}

func NewBufferedTransport(trans Transport, bufferSize int) Transport {
func NewBuffered(trans Transport, bufferSize int) Transport {
return &bufferedTransport{
r: bufio.NewReaderSize(trans, bufferSize),
w: bufio.NewWriterSize(trans, bufferSize),
Expand Down
9 changes: 5 additions & 4 deletions buffered_transport_test.go → transports/buffered_test.go
Original file line number Diff line number Diff line change
@@ -1,19 +1,20 @@
package avroipc_test
package transports_test

import (
"fmt"
"testing"
"time"

"github.com/myzhan/avroipc"
"github.com/myzhan/avroipc/mocks"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"

"github.com/myzhan/avroipc/transports"
)

func prepareBufferedTransport() (avroipc.Transport, *mocks.MockTransport) {
func prepareBufferedTransport() (transports.Transport, *mocks.MockTransport) {
m := &mocks.MockTransport{}
b := avroipc.NewBufferedTransport(m, 8)
b := transports.NewBuffered(m, 8)

return b, m
}
Expand Down
2 changes: 1 addition & 1 deletion socket.go → transports/socket.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package avroipc
package transports

import (
"fmt"
Expand Down
25 changes: 13 additions & 12 deletions socket_test.go → transports/socket_test.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package avroipc_test
package transports_test

import (
"bufio"
Expand All @@ -8,8 +8,9 @@ import (
"testing"
"time"

"github.com/myzhan/avroipc"
"github.com/stretchr/testify/require"

"github.com/myzhan/avroipc/transports"
)

func runServer(t *testing.T) (string, func() error) {
Expand Down Expand Up @@ -56,7 +57,7 @@ func TestSocket(t *testing.T) {
var b []byte

t.Run("error", func(t *testing.T) {
trans, err := avroipc.NewSocket("localhost:12345")
trans, err := transports.NewSocket("localhost:12345")
require.NoError(t, err)

err = trans.Open()
Expand All @@ -66,7 +67,7 @@ func TestSocket(t *testing.T) {
})

t.Run("flush", func(t *testing.T) {
trans, err := avroipc.NewSocket("")
trans, err := transports.NewSocket("")
require.NoError(t, err)

err = trans.Flush()
Expand All @@ -76,7 +77,7 @@ func TestSocket(t *testing.T) {
t.Run("success", func(t *testing.T) {
addr, clean := runServer(t)

trans, err := avroipc.NewSocket(addr)
trans, err := transports.NewSocket(addr)
require.NoError(t, err)

err = trans.Open()
Expand All @@ -101,7 +102,7 @@ func TestSocket(t *testing.T) {
t.Run("timeout", func(t *testing.T) {
addr, clean := runServer(t)

trans, err := avroipc.NewSocketTimeout(addr, 1)
trans, err := transports.NewSocketTimeout(addr, 1)
require.NoError(t, err)

err = trans.Open()
Expand All @@ -113,7 +114,7 @@ func TestSocket(t *testing.T) {
})

t.Run("not open", func(t *testing.T) {
trans, err := avroipc.NewSocket("")
trans, err := transports.NewSocket("")
require.NoError(t, err)

_, err = trans.Read([]byte{})
Expand All @@ -130,7 +131,7 @@ func TestSocket(t *testing.T) {
t.Run("already open", func(t *testing.T) {
addr, clean := runServer(t)

trans, err := avroipc.NewSocket(addr)
trans, err := transports.NewSocket(addr)
require.NoError(t, err)

require.NoError(t, trans.Open())
Expand All @@ -143,7 +144,7 @@ func TestSocket(t *testing.T) {
t.Run("read/write timeout", func(t *testing.T) {
addr, clean := runServer(t)

trans, err := avroipc.NewSocket(addr)
trans, err := transports.NewSocket(addr)
require.NoError(t, err)

err = trans.Open()
Expand All @@ -169,7 +170,7 @@ func TestSocket(t *testing.T) {
t.Run("close multiple times", func(t *testing.T) {
addr, clean := runServer(t)

trans, err := avroipc.NewSocket(addr)
trans, err := transports.NewSocket(addr)
require.NoError(t, err)

require.NoError(t, trans.Close())
Expand All @@ -186,11 +187,11 @@ func TestSocket(t *testing.T) {
}

func TestNewSocket(t *testing.T) {
_, err := avroipc.NewSocket("1:2:3")
_, err := transports.NewSocket("1:2:3")
require.Error(t, err)
}

func TestNewSocketTimeout(t *testing.T) {
_, err := avroipc.NewSocketTimeout("1:2:3", 1)
_, err := transports.NewSocketTimeout("1:2:3", 1)
require.Error(t, err)
}
2 changes: 1 addition & 1 deletion transport.go → transports/transport.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package avroipc
package transports

import (
"io"
Expand Down
4 changes: 2 additions & 2 deletions zlib_transport.go → transports/zlib.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package avroipc
package transports

import (
"compress/zlib"
Expand All @@ -12,7 +12,7 @@ type zlibTransport struct {
trans Transport
}

func NewZlibTransport(trans Transport, level int) (Transport, error) {
func NewZlib(trans Transport, level int) (Transport, error) {
w, err := zlib.NewWriterLevel(trans, level)
if err != nil {
return nil, err
Expand Down
17 changes: 9 additions & 8 deletions zlib_transport_test.go → transports/zlib_test.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package avroipc_test
package transports_test

import (
"bytes"
Expand All @@ -7,8 +7,9 @@ import (
"testing"
"time"

"github.com/myzhan/avroipc"
"github.com/stretchr/testify/require"

"github.com/myzhan/avroipc/transports"
)

var data = []byte{0x78, 0x01, 0x00, 0x04, 0x00, 0xfb, 0xff, 0x74, 0x65, 0x73, 0x74, 0x01, 0x00, 0x00, 0xff, 0xff, 0x04, 0x5d, 0x01, 0xc1}
Expand Down Expand Up @@ -37,7 +38,7 @@ func (m *mockTransport) SetDeadline(time.Time) error {
func TestZlibTransport_Open(t *testing.T) {
m := &mockTransport{}

trans, err := avroipc.NewZlibTransport(m, 1)
trans, err := transports.NewZlib(m, 1)
require.NoError(t, err)

err = trans.Open()
Expand All @@ -48,7 +49,7 @@ func TestZlibTransport_Read(t *testing.T) {
m := &mockTransport{}
m.Buffer.Write(data)

trans, err := avroipc.NewZlibTransport(m, 1)
trans, err := transports.NewZlib(m, 1)
require.NoError(t, err)

b := make([]byte, 4)
Expand All @@ -63,7 +64,7 @@ func TestZlibTransport_Write(t *testing.T) {
t.Run("short write", func(t *testing.T) {
m := &mockTransport{}

trans, err := avroipc.NewZlibTransport(m, 1)
trans, err := transports.NewZlib(m, 1)
require.NoError(t, err)

b := []byte("test")
Expand All @@ -76,7 +77,7 @@ func TestZlibTransport_Write(t *testing.T) {
t.Run("with close", func(t *testing.T) {
m := &mockTransport{}

trans, err := avroipc.NewZlibTransport(m, 1)
trans, err := transports.NewZlib(m, 1)
require.NoError(t, err)

b := []byte("test")
Expand All @@ -92,7 +93,7 @@ func TestZlibTransport_Write(t *testing.T) {
t.Run("with flush", func(t *testing.T) {
m := &mockTransport{}

trans, err := avroipc.NewZlibTransport(m, 1)
trans, err := transports.NewZlib(m, 1)
require.NoError(t, err)

b := []byte("test")
Expand All @@ -114,7 +115,7 @@ func TestZlibTransport_Write(t *testing.T) {
d := time.Now()
m := &mockTransport{}

trans, err := avroipc.NewZlibTransport(m, 1)
trans, err := transports.NewZlib(m, 1)
require.NoError(t, err)

err = trans.SetDeadline(d)
Expand Down

0 comments on commit 440c6f7

Please sign in to comment.