Skip to content

Commit

Permalink
Add more tests for the client, close #20 (#29)
Browse files Browse the repository at this point in the history
  • Loading branch information
vykulakov committed Jan 1, 2020
1 parent 42e3ba2 commit ba6a320
Show file tree
Hide file tree
Showing 4 changed files with 223 additions and 3 deletions.
177 changes: 177 additions & 0 deletions client_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,177 @@
package avroipc_test

import (
"bytes"
"io"
"net"
"testing"
"time"

"github.com/myzhan/avroipc/internal"

"github.com/stretchr/testify/require"

"github.com/myzhan/avroipc"
)

type pair struct {
req []byte
resp []byte
}

func TestClient_Append(t *testing.T) {
data := map[string]struct {
pairs []pair

level int
buffer int
}{
"plain data": {
pairs: []pair{{
// Handshake request
req: []byte{
// The frame serial: 1
0x00, 0x00, 0x00, 0x01,
// The number of frames: 1
0x00, 0x00, 0x00, 0x01,
// The frame length: 36
0x00, 0x00, 0x00, 0x24,
// MD5 hash of the client message protocol
0x49, 0x87, 0x43, 0x7B, 0xF5, 0x09, 0xDF, 0xDE, 0x62, 0x36, 0x72, 0x99, 0xEF, 0x40, 0xC8, 0x2F,
// The client message protocol, don't pass it by default: null
0x00,
// MD5 hash of the server message protocol that already known by client for the server
0x49, 0x87, 0x43, 0x7B, 0xF5, 0x09, 0xDF, 0xDE, 0x62, 0x36, 0x72, 0x99, 0xEF, 0x40, 0xC8, 0x2F,
// Meta
0x00,
// Empty message
0x00, 0x00,
},
resp: []byte{
// The frame serial: 1
0x00, 0x00, 0x00, 0x01,
// The number of frames: 1
0x00, 0x00, 0x00, 0x01,
// The frame length: 25
0x00, 0x00, 0x00, 0x19,
// Match field: NONE
0x02,
// The server message protocol: type(string):length(4):value(tttt)
0x02, 0x08, 0x74, 0x74, 0x74, 0x74,
// MD5 hash of the server message protocol: type(MD5):value(...)
0x02, 0x86, 0xAA, 0xDA, 0xE2, 0xC4, 0x54, 0x74, 0xC0, 0xFE, 0x93, 0xFF, 0xD0, 0xF2, 0x35, 0x0A, 0x65,
// Meta
0x00,
},
}, {
// Regular append call
req: []byte{
// The frame serial: 2
0x00, 0x00, 0x00, 0x02,
// The number of frames: 1
0x00, 0x00, 0x00, 0x01,
// The frame length: 14
0x00, 0x00, 0x00, 0x0E,
// Meta
0x00,
// Method: length(6):value(append)
0x0C, 0x61, 0x70, 0x70, 0x65, 0x6E, 0x64,
// Event header
0x00,
// Event body: length(4):value(tttt)
0x08, 0x74, 0x74, 0x74, 0x74,
},
resp: []byte{
// The frame serial: 2
0x00, 0x00, 0x00, 0x02,
// The number of frames: 3
0x00, 0x00, 0x00, 0x03,
// The frame length: 0
0x00, 0x00, 0x00, 0x00,
// The frame length: 1
0x00, 0x00, 0x00, 0x01,
// Meta
0x00,
// The frame length: 2
0x00, 0x00, 0x00, 0x02,
// Response flag
0x00,
// Response status
0x00,
},
}},
level: 0,
buffer: 0,
},
"compressed data": {
pairs: []pair{{
// Handshake request
req: []byte{
// Compressed data
0x78, 0x9C,
0x62, 0x60, 0x60, 0x60, 0x84, 0x62, 0x15, 0xCF, 0x76, 0xE7, 0xEA, 0xAF, 0x9C, 0xF7, 0xEF, 0x25, 0x99, 0x15, 0xCD, 0x7C, 0xEF, 0x70, 0x42, 0x9F, 0x01, 0x43, 0x80, 0x81, 0x01, 0x00,
0x00, 0x00, 0xFF, 0xFF,
},
resp: []byte{
// Compressed data
0x78, 0x9C,
0x62, 0x60, 0x60, 0x60, 0x84, 0x62, 0x49, 0x26, 0x26, 0x8E, 0x92, 0x92, 0x92, 0x12, 0xA6, 0xB6, 0x55, 0xB7, 0x1E, 0x1D, 0x09, 0x29, 0x39, 0xF0, 0x6F, 0xF2, 0xFF, 0x0B, 0x9F, 0x4C, 0xB9, 0x52, 0x19, 0x00, 0x00,
0x00, 0x00, 0xFF, 0xFF,
},
}, {
// Regular append call
req: []byte{
// Compressed data
0x62, 0x60, 0x60, 0x60, 0x82, 0xAA, 0xE7, 0x63, 0xE0, 0x49, 0x2C, 0x28, 0x48, 0xCD, 0x4B, 0x61, 0xE0, 0x28, 0x29, 0x29, 0x29, 0x01, 0x00,
0x00, 0x00, 0xFF, 0xFF,
},
resp: []byte{
// Compressed data
0x62, 0x60, 0x60, 0x60, 0x62, 0x60, 0x60, 0x60, 0x66, 0x80, 0x00, 0x90, 0x62, 0x90, 0x00, 0x00,
0x00, 0x00, 0xFF, 0xFF,
},
}},
level: 6,
buffer: 1024,
},
}
for n, d := range data {
t.Run(n, func(t *testing.T) {
addr, clean := internal.RunServer(t, func(conn net.Conn) error {
req := internal.Buffer{}
for {
err := req.ReadFrom(conn)
if err == io.EOF {
return nil
}
if err != nil {
return err
}

for _, p := range d.pairs {
if bytes.Equal(req.Bytes(), p.req) {
req.Reset()
_, err := conn.Write(p.resp)
if err != nil {
return err
}
}
}
}
})

client, err := avroipc.NewClient(addr, 1*time.Second, 3*time.Second, d.buffer, d.level)
require.NoError(t, err)

event := &avroipc.Event{
Body: []byte("tttt"),
}
status, err := client.Append(event)
require.NoError(t, err)
require.Equal(t, "OK", status)

require.NoError(t, client.Close())
require.NoError(t, clean())
})
}
}
38 changes: 38 additions & 0 deletions internal/buffer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
package internal

import (
"io"
)

type Buffer struct {
buf []byte
}

func (b *Buffer) Len() int {
return len(b.buf)
}

func (b *Buffer) Bytes() []byte {
return b.buf
}

func (b *Buffer) Reset() {
b.buf = b.buf[:0]
}

func (b *Buffer) ReadFrom(r io.Reader) error {
buf := make([]byte, 1024)

for {
n, err := r.Read(buf)
if err != nil {
return err
}
b.buf = append(b.buf, buf[:n]...)
if n < len(buf) {
break
}
}

return nil
}
3 changes: 2 additions & 1 deletion internal/server.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
package internal

import (
"github.com/stretchr/testify/require"
"net"
"testing"

"github.com/stretchr/testify/require"
)

func RunServer(t *testing.T, handler func(net.Conn) error) (string, func() error) {
Expand Down
8 changes: 6 additions & 2 deletions transports/socket.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,15 +61,19 @@ func (s *socket) Read(buf []byte) (int, error) {
return 0, fmt.Errorf("not open")
}

return s.conn.Read(buf)
n, err := s.conn.Read(buf)
fmt.Printf("R[%04d]: %X\n", n, buf[:n])
return n, err
}

func (s *socket) Write(buf []byte) (int, error) {
if s.conn == nil {
return 0, fmt.Errorf("not open")
}

return s.conn.Write(buf)
n, err := s.conn.Write(buf)
fmt.Printf("W[%04d]: %X\n", n, buf)
return n, err
}

func (s *socket) Flush() error {
Expand Down

0 comments on commit ba6a320

Please sign in to comment.