Skip to content

Commit

Permalink
Remove the open method from transports (#31)
Browse files Browse the repository at this point in the history
The original design with the Open method and the possibility to postpone opening a connection to a Flume endpoint looks not so useful now and just produces many additional and unnecessary code. Removing such feature simplifies the code and makes it more clear.
  • Loading branch information
vykulakov committed Jan 2, 2020
1 parent ec69743 commit 1435beb
Show file tree
Hide file tree
Showing 8 changed files with 49 additions and 218 deletions.
4 changes: 2 additions & 2 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ func (c *client) initProtocols() error {
}

func (c *client) initTransports(addr string, config *Config) (err error) {
c.transport, err = transports.NewSocketTimeout(addr, config.Timeout)
c.transport, err = transports.NewSocket(addr, config.Timeout)
if err != nil {
return err
}
Expand All @@ -89,7 +89,7 @@ func (c *client) initTransports(addr string, config *Config) (err error) {
}
}

return c.transport.Open()
return
}

func (c *client) send(request []byte) ([]byte, error) {
Expand Down
4 changes: 0 additions & 4 deletions transports/buffered.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,6 @@ func NewBuffered(trans Transport, bufferSize int) Transport {
}
}

func (p *bufferedTransport) Open() (err error) {
return p.trans.Open()
}

func (p *bufferedTransport) Close() (err error) {
return p.trans.Close()
}
Expand Down
22 changes: 0 additions & 22 deletions transports/buffered_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,28 +19,6 @@ func prepareBufferedTransport() (transports.Transport, *mocks.MockTransport) {
return b, m
}

func TestBufferedTransport_Open(t *testing.T) {
t.Run("succeed", func(t *testing.T) {
b, m := prepareBufferedTransport()

m.On("Open").Return(nil).Once()

err := b.Open()
require.NoError(t, err)
m.AssertExpectations(t)
})

t.Run("failed", func(t *testing.T) {
b, m := prepareBufferedTransport()

m.On("Open").Return(fmt.Errorf("test error")).Once()

err := b.Open()
require.EqualError(t, err, "test error")
m.AssertExpectations(t)
})
}

func TestBufferedTransport_Close(t *testing.T) {
t.Run("succeed", func(t *testing.T) {
b, m := prepareBufferedTransport()
Expand Down
68 changes: 6 additions & 62 deletions transports/socket.go
Original file line number Diff line number Diff line change
@@ -1,85 +1,29 @@
package transports

import (
"fmt"
"net"
"time"
)

type socket struct {
conn net.Conn
addr net.Addr
timeout time.Duration
net.Conn
}

func NewSocket(hostPort string) (Transport, error) {
return NewSocketTimeout(hostPort, 0)
}

func NewSocketTimeout(hostPort string, timeout time.Duration) (Transport, error) {
func NewSocket(hostPort string, timeout time.Duration) (Transport, error) {
addr, err := net.ResolveTCPAddr("tcp", hostPort)
if err != nil {
return nil, err
}

return &socket{addr: addr, timeout: timeout}, nil
}

// Connects the socket, creating a new socket object if necessary.
func (s *socket) Open() error {
if s.conn != nil {
return fmt.Errorf("already open")
}

conn, err := net.DialTimeout(s.addr.Network(), s.addr.String(), s.timeout)
if err != nil {
return err
}

s.conn = conn

return nil
}

func (s *socket) Close() error {
if s.conn == nil {
return nil
}

err := s.conn.Close()
s := &socket{}
s.Conn, err = net.DialTimeout(addr.Network(), addr.String(), timeout)
if err != nil {
return err
}

s.conn = nil

return nil
}

func (s *socket) Read(buf []byte) (int, error) {
if s.conn == nil {
return 0, fmt.Errorf("not open")
}

n, err := s.conn.Read(buf)
fmt.Printf("R[%04d]: %X\n", n, buf[:n])
return n, err
}

func (s *socket) Write(buf []byte) (int, error) {
if s.conn == nil {
return 0, fmt.Errorf("not open")
return nil, err
}

n, err := s.conn.Write(buf)
fmt.Printf("W[%04d]: %X\n", n, buf)
return n, err
return s, nil
}

func (s *socket) Flush() error {
return nil
}

func (s *socket) SetDeadline(d time.Time) error {
return s.conn.SetDeadline(d)
}
126 changes: 32 additions & 94 deletions transports/socket_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package transports_test
import (
"bufio"
"fmt"
"io"
"net"
"testing"
"time"
Expand All @@ -30,114 +29,68 @@ func handler(conn net.Conn) error {
return s.Err()
}

func TestSocket(t *testing.T) {
var n int
var b []byte

t.Run("error", func(t *testing.T) {
trans, err := transports.NewSocket("localhost:12345")
require.NoError(t, err)
func prepareSocket(t *testing.T) (transports.Transport, func() error) {
addr, clean := internal.RunServer(t, handler)

err = trans.Open()
require.Error(t, err)

require.NoError(t, trans.Close())
})
trans, err := transports.NewSocket(addr, time.Second)
require.NoError(t, err)

t.Run("flush", func(t *testing.T) {
trans, err := transports.NewSocket("")
require.NoError(t, err)

err = trans.Flush()
require.NoError(t, err)
})
return trans, clean
}

func TestSocket(t *testing.T) {
t.Run("success", func(t *testing.T) {
addr, clean := internal.RunServer(t, handler)

trans, err := transports.NewSocket(addr)
require.NoError(t, err)
trans, clean := prepareSocket(t)

err = trans.Open()
err := trans.Flush()
require.NoError(t, err)

b = []byte("ping\n")
n, err = trans.Write(b)
_, err = trans.Write([]byte("ping\n"))
require.NoError(t, err)
require.Equal(t, 5, n)

b = make([]byte, 4)
n, err = io.ReadFull(trans, b)
b := &internal.Buffer{}
err = b.ReadFrom(trans)
require.NoError(t, err)
require.Equal(t, 4, n)
require.Equal(t, []byte("pong"), b[:n])
require.Equal(t, []byte("pong"), b.Bytes())

require.NoError(t, clean())
require.NoError(t, trans.Close())
})

// TODO Use more robust method to test timeout errors
// TODO Use a more robust method to test timeout errors
t.Run("timeout", func(t *testing.T) {
addr, clean := internal.RunServer(t, handler)

trans, err := transports.NewSocketTimeout(addr, 1)
require.NoError(t, err)

err = trans.Open()
_, err := transports.NewSocket(addr, 1)
require.Error(t, err)
require.Contains(t, err.Error(), "i/o timeout")

require.NoError(t, clean())
require.NoError(t, trans.Close())
})

t.Run("not open", func(t *testing.T) {
trans, err := transports.NewSocket("")
require.NoError(t, err)

_, err = trans.Read([]byte{})
require.Error(t, err)
require.Contains(t, err.Error(), "not open")

_, err = trans.Write([]byte{})
t.Run("bad address", func(t *testing.T) {
_, err := transports.NewSocket("1:2:3", time.Second)
require.Error(t, err)
require.Contains(t, err.Error(), "not open")

require.NoError(t, trans.Close())
require.Contains(t, err.Error(), "too many colons in address")
})

t.Run("already open", func(t *testing.T) {
addr, clean := internal.RunServer(t, handler)

trans, err := transports.NewSocket(addr)
require.NoError(t, err)

require.NoError(t, trans.Open())
require.Error(t, trans.Open())

require.NoError(t, clean())
require.NoError(t, trans.Close())
t.Run("connection refused", func(t *testing.T) {
_, err := transports.NewSocket("localhost:12345", time.Second)
require.Error(t, err)
require.Contains(t, err.Error(), "connect: connection refused")
})

t.Run("read/write timeout", func(t *testing.T) {
addr, clean := internal.RunServer(t, handler)

trans, err := transports.NewSocket(addr)
require.NoError(t, err)
trans, clean := prepareSocket(t)

err = trans.Open()
err := trans.SetDeadline(time.Now().Add(time.Second))
require.NoError(t, err)

err = trans.SetDeadline(time.Now().Add(time.Second))
_, err = trans.Write([]byte("sleep\n"))
require.NoError(t, err)

b = []byte("sleep\n")
n, err = trans.Write(b)
require.NoError(t, err)
require.Equal(t, 6, n)

b = make([]byte, 5)
n, err = io.ReadFull(trans, b)
b := &internal.Buffer{}
err = b.ReadFrom(trans)
require.Error(t, err)
require.Contains(t, err.Error(), "i/o timeout")

Expand All @@ -146,30 +99,15 @@ func TestSocket(t *testing.T) {
})

t.Run("close multiple times", func(t *testing.T) {
addr, clean := internal.RunServer(t, handler)
trans, clean := prepareSocket(t)

trans, err := transports.NewSocket(addr)
err := trans.Close()
require.NoError(t, err)

require.NoError(t, trans.Close())
require.NoError(t, trans.Close())

err = trans.Open()
require.NoError(t, err)

require.NoError(t, trans.Close())
require.NoError(t, trans.Close())
err = trans.Close()
require.Error(t, err)
require.Contains(t, err.Error(), "use of closed network connection")

require.NoError(t, clean())
})
}

func TestNewSocket(t *testing.T) {
_, err := transports.NewSocket("1:2:3")
require.Error(t, err)
}

func TestNewSocketTimeout(t *testing.T) {
_, err := transports.NewSocketTimeout("1:2:3", 1)
require.Error(t, err)
}
1 change: 0 additions & 1 deletion transports/transport.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ import (
type Transport interface {
io.ReadWriteCloser

Open() error
Flush() error
SetDeadline(t time.Time) error
}
4 changes: 0 additions & 4 deletions transports/zlib.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,6 @@ func NewZlib(trans Transport, level int) (Transport, error) {
}, nil
}

func (t *zlibTransport) Open() error {
return t.trans.Open()
}

func (t *zlibTransport) Close() error {
if t.r != nil {
err := t.r.Close()
Expand Down
Loading

0 comments on commit 1435beb

Please sign in to comment.