From 1435beb145dd4a9b288137ce82e156cc91261dca Mon Sep 17 00:00:00 2001 From: slava Date: Thu, 2 Jan 2020 22:34:13 +0300 Subject: [PATCH] Remove the open method from transports (#31) The original design with the Open method and the possibility to postpone opening a connection to a Flume endpoint looks not so useful now and just produces many additional and unnecessary code. Removing such feature simplifies the code and makes it more clear. --- client.go | 4 +- transports/buffered.go | 4 -- transports/buffered_test.go | 22 ------- transports/socket.go | 68 ++----------------- transports/socket_test.go | 126 +++++++++--------------------------- transports/transport.go | 1 - transports/zlib.go | 4 -- transports/zlib_test.go | 38 +++-------- 8 files changed, 49 insertions(+), 218 deletions(-) diff --git a/client.go b/client.go index bfe6973..c25374b 100644 --- a/client.go +++ b/client.go @@ -74,7 +74,7 @@ func (c *client) initProtocols() error { } func (c *client) initTransports(addr string, config *Config) (err error) { - c.transport, err = transports.NewSocketTimeout(addr, config.Timeout) + c.transport, err = transports.NewSocket(addr, config.Timeout) if err != nil { return err } @@ -89,7 +89,7 @@ func (c *client) initTransports(addr string, config *Config) (err error) { } } - return c.transport.Open() + return } func (c *client) send(request []byte) ([]byte, error) { diff --git a/transports/buffered.go b/transports/buffered.go index 9c3e05d..4e9a2d3 100644 --- a/transports/buffered.go +++ b/transports/buffered.go @@ -19,10 +19,6 @@ func NewBuffered(trans Transport, bufferSize int) Transport { } } -func (p *bufferedTransport) Open() (err error) { - return p.trans.Open() -} - func (p *bufferedTransport) Close() (err error) { return p.trans.Close() } diff --git a/transports/buffered_test.go b/transports/buffered_test.go index 6f39878..d12850a 100644 --- a/transports/buffered_test.go +++ b/transports/buffered_test.go @@ -19,28 +19,6 @@ func prepareBufferedTransport() (transports.Transport, *mocks.MockTransport) { return b, m } -func TestBufferedTransport_Open(t *testing.T) { - t.Run("succeed", func(t *testing.T) { - b, m := prepareBufferedTransport() - - m.On("Open").Return(nil).Once() - - err := b.Open() - require.NoError(t, err) - m.AssertExpectations(t) - }) - - t.Run("failed", func(t *testing.T) { - b, m := prepareBufferedTransport() - - m.On("Open").Return(fmt.Errorf("test error")).Once() - - err := b.Open() - require.EqualError(t, err, "test error") - m.AssertExpectations(t) - }) -} - func TestBufferedTransport_Close(t *testing.T) { t.Run("succeed", func(t *testing.T) { b, m := prepareBufferedTransport() diff --git a/transports/socket.go b/transports/socket.go index 2f006b9..53809d9 100644 --- a/transports/socket.go +++ b/transports/socket.go @@ -1,85 +1,29 @@ package transports import ( - "fmt" "net" "time" ) type socket struct { - conn net.Conn - addr net.Addr - timeout time.Duration + net.Conn } -func NewSocket(hostPort string) (Transport, error) { - return NewSocketTimeout(hostPort, 0) -} - -func NewSocketTimeout(hostPort string, timeout time.Duration) (Transport, error) { +func NewSocket(hostPort string, timeout time.Duration) (Transport, error) { addr, err := net.ResolveTCPAddr("tcp", hostPort) if err != nil { return nil, err } - return &socket{addr: addr, timeout: timeout}, nil -} - -// Connects the socket, creating a new socket object if necessary. -func (s *socket) Open() error { - if s.conn != nil { - return fmt.Errorf("already open") - } - - conn, err := net.DialTimeout(s.addr.Network(), s.addr.String(), s.timeout) - if err != nil { - return err - } - - s.conn = conn - - return nil -} - -func (s *socket) Close() error { - if s.conn == nil { - return nil - } - - err := s.conn.Close() + s := &socket{} + s.Conn, err = net.DialTimeout(addr.Network(), addr.String(), timeout) if err != nil { - return err - } - - s.conn = nil - - return nil -} - -func (s *socket) Read(buf []byte) (int, error) { - if s.conn == nil { - return 0, fmt.Errorf("not open") - } - - n, err := s.conn.Read(buf) - fmt.Printf("R[%04d]: %X\n", n, buf[:n]) - return n, err -} - -func (s *socket) Write(buf []byte) (int, error) { - if s.conn == nil { - return 0, fmt.Errorf("not open") + return nil, err } - n, err := s.conn.Write(buf) - fmt.Printf("W[%04d]: %X\n", n, buf) - return n, err + return s, nil } func (s *socket) Flush() error { return nil } - -func (s *socket) SetDeadline(d time.Time) error { - return s.conn.SetDeadline(d) -} diff --git a/transports/socket_test.go b/transports/socket_test.go index d9b2191..45d5ae5 100644 --- a/transports/socket_test.go +++ b/transports/socket_test.go @@ -3,7 +3,6 @@ package transports_test import ( "bufio" "fmt" - "io" "net" "testing" "time" @@ -30,114 +29,68 @@ func handler(conn net.Conn) error { return s.Err() } -func TestSocket(t *testing.T) { - var n int - var b []byte - - t.Run("error", func(t *testing.T) { - trans, err := transports.NewSocket("localhost:12345") - require.NoError(t, err) +func prepareSocket(t *testing.T) (transports.Transport, func() error) { + addr, clean := internal.RunServer(t, handler) - err = trans.Open() - require.Error(t, err) - - require.NoError(t, trans.Close()) - }) + trans, err := transports.NewSocket(addr, time.Second) + require.NoError(t, err) - t.Run("flush", func(t *testing.T) { - trans, err := transports.NewSocket("") - require.NoError(t, err) - - err = trans.Flush() - require.NoError(t, err) - }) + return trans, clean +} +func TestSocket(t *testing.T) { t.Run("success", func(t *testing.T) { - addr, clean := internal.RunServer(t, handler) - - trans, err := transports.NewSocket(addr) - require.NoError(t, err) + trans, clean := prepareSocket(t) - err = trans.Open() + err := trans.Flush() require.NoError(t, err) - b = []byte("ping\n") - n, err = trans.Write(b) + _, err = trans.Write([]byte("ping\n")) require.NoError(t, err) - require.Equal(t, 5, n) - b = make([]byte, 4) - n, err = io.ReadFull(trans, b) + b := &internal.Buffer{} + err = b.ReadFrom(trans) require.NoError(t, err) - require.Equal(t, 4, n) - require.Equal(t, []byte("pong"), b[:n]) + require.Equal(t, []byte("pong"), b.Bytes()) require.NoError(t, clean()) require.NoError(t, trans.Close()) }) - // TODO Use more robust method to test timeout errors + // TODO Use a more robust method to test timeout errors t.Run("timeout", func(t *testing.T) { addr, clean := internal.RunServer(t, handler) - trans, err := transports.NewSocketTimeout(addr, 1) - require.NoError(t, err) - - err = trans.Open() + _, err := transports.NewSocket(addr, 1) require.Error(t, err) require.Contains(t, err.Error(), "i/o timeout") require.NoError(t, clean()) - require.NoError(t, trans.Close()) }) - t.Run("not open", func(t *testing.T) { - trans, err := transports.NewSocket("") - require.NoError(t, err) - - _, err = trans.Read([]byte{}) - require.Error(t, err) - require.Contains(t, err.Error(), "not open") - - _, err = trans.Write([]byte{}) + t.Run("bad address", func(t *testing.T) { + _, err := transports.NewSocket("1:2:3", time.Second) require.Error(t, err) - require.Contains(t, err.Error(), "not open") - - require.NoError(t, trans.Close()) + require.Contains(t, err.Error(), "too many colons in address") }) - t.Run("already open", func(t *testing.T) { - addr, clean := internal.RunServer(t, handler) - - trans, err := transports.NewSocket(addr) - require.NoError(t, err) - - require.NoError(t, trans.Open()) - require.Error(t, trans.Open()) - - require.NoError(t, clean()) - require.NoError(t, trans.Close()) + t.Run("connection refused", func(t *testing.T) { + _, err := transports.NewSocket("localhost:12345", time.Second) + require.Error(t, err) + require.Contains(t, err.Error(), "connect: connection refused") }) t.Run("read/write timeout", func(t *testing.T) { - addr, clean := internal.RunServer(t, handler) - - trans, err := transports.NewSocket(addr) - require.NoError(t, err) + trans, clean := prepareSocket(t) - err = trans.Open() + err := trans.SetDeadline(time.Now().Add(time.Second)) require.NoError(t, err) - err = trans.SetDeadline(time.Now().Add(time.Second)) + _, err = trans.Write([]byte("sleep\n")) require.NoError(t, err) - b = []byte("sleep\n") - n, err = trans.Write(b) - require.NoError(t, err) - require.Equal(t, 6, n) - - b = make([]byte, 5) - n, err = io.ReadFull(trans, b) + b := &internal.Buffer{} + err = b.ReadFrom(trans) require.Error(t, err) require.Contains(t, err.Error(), "i/o timeout") @@ -146,30 +99,15 @@ func TestSocket(t *testing.T) { }) t.Run("close multiple times", func(t *testing.T) { - addr, clean := internal.RunServer(t, handler) + trans, clean := prepareSocket(t) - trans, err := transports.NewSocket(addr) + err := trans.Close() require.NoError(t, err) - require.NoError(t, trans.Close()) - require.NoError(t, trans.Close()) - - err = trans.Open() - require.NoError(t, err) - - require.NoError(t, trans.Close()) - require.NoError(t, trans.Close()) + err = trans.Close() + require.Error(t, err) + require.Contains(t, err.Error(), "use of closed network connection") require.NoError(t, clean()) }) } - -func TestNewSocket(t *testing.T) { - _, err := transports.NewSocket("1:2:3") - require.Error(t, err) -} - -func TestNewSocketTimeout(t *testing.T) { - _, err := transports.NewSocketTimeout("1:2:3", 1) - require.Error(t, err) -} diff --git a/transports/transport.go b/transports/transport.go index 801ac35..4c88920 100644 --- a/transports/transport.go +++ b/transports/transport.go @@ -8,7 +8,6 @@ import ( type Transport interface { io.ReadWriteCloser - Open() error Flush() error SetDeadline(t time.Time) error } diff --git a/transports/zlib.go b/transports/zlib.go index 27d4077..7a06bc0 100644 --- a/transports/zlib.go +++ b/transports/zlib.go @@ -24,10 +24,6 @@ func NewZlib(trans Transport, level int) (Transport, error) { }, nil } -func (t *zlibTransport) Open() error { - return t.trans.Open() -} - func (t *zlibTransport) Close() error { if t.r != nil { err := t.r.Close() diff --git a/transports/zlib_test.go b/transports/zlib_test.go index 3bb0fb7..0559596 100644 --- a/transports/zlib_test.go +++ b/transports/zlib_test.go @@ -19,10 +19,6 @@ type mockTransport struct { bytes.Buffer } -func (m *mockTransport) Open() error { - return errors.New("test error") -} - func (m *mockTransport) Close() error { return nil } @@ -35,22 +31,18 @@ func (m *mockTransport) SetDeadline(time.Time) error { return errors.New("test error") } -func TestZlibTransport_Open(t *testing.T) { +func prepareZlibTransport(t *testing.T, data []byte) (transports.Transport, *mockTransport) { m := &mockTransport{} + m.Buffer.Write(data) trans, err := transports.NewZlib(m, 1) require.NoError(t, err) - err = trans.Open() - require.EqualError(t, err, "test error") + return trans, m } func TestZlibTransport_Read(t *testing.T) { - m := &mockTransport{} - m.Buffer.Write(data) - - trans, err := transports.NewZlib(m, 1) - require.NoError(t, err) + trans, _ := prepareZlibTransport(t, data) b := make([]byte, 4) n, err := trans.Read(b) @@ -62,10 +54,7 @@ func TestZlibTransport_Read(t *testing.T) { func TestZlibTransport_Write(t *testing.T) { t.Run("short write", func(t *testing.T) { - m := &mockTransport{} - - trans, err := transports.NewZlib(m, 1) - require.NoError(t, err) + trans, m := prepareZlibTransport(t, []byte{}) b := []byte("test") n, err := trans.Write(b) @@ -75,10 +64,7 @@ func TestZlibTransport_Write(t *testing.T) { require.Equal(t, data[:2], m.Bytes()) }) t.Run("with close", func(t *testing.T) { - m := &mockTransport{} - - trans, err := transports.NewZlib(m, 1) - require.NoError(t, err) + trans, m := prepareZlibTransport(t, []byte{}) b := []byte("test") n, err := trans.Write(b) @@ -91,10 +77,7 @@ func TestZlibTransport_Write(t *testing.T) { require.Equal(t, data, m.Bytes()) }) t.Run("with flush", func(t *testing.T) { - m := &mockTransport{} - - trans, err := transports.NewZlib(m, 1) - require.NoError(t, err) + trans, m := prepareZlibTransport(t, []byte{}) b := []byte("test") n, err := trans.Write(b) @@ -113,12 +96,9 @@ func TestZlibTransport_Write(t *testing.T) { }) t.Run("set deadline", func(t *testing.T) { d := time.Now() - m := &mockTransport{} - - trans, err := transports.NewZlib(m, 1) - require.NoError(t, err) + trans, _ := prepareZlibTransport(t, []byte{}) - err = trans.SetDeadline(d) + err := trans.SetDeadline(d) require.EqualError(t, err, "test error") }) }