Skip to content

Commit

Permalink
Use a separate entity to configure the client, close #19 (#30)
Browse files Browse the repository at this point in the history
  • Loading branch information
vykulakov committed Jan 2, 2020
1 parent ba6a320 commit ec69743
Show file tree
Hide file tree
Showing 6 changed files with 185 additions and 44 deletions.
28 changes: 26 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@ import (
)

func main() {
// flume avro instance address
client, err := avroipc.NewClient("localhost:20200", 0, 0, 1024, 6)
// Create a new client with default parameters
client, err := avroipc.NewClient("localhost:20200")
if err != nil {
log.Fatal(err)
}
Expand All @@ -44,6 +44,30 @@ func main() {
}
```

To specify particular parameters of the client it is possible to use the config builder:
```go
package main
import (
"log"
"time"

"github.com/myzhan/avroipc"
)

func main() {
config := avroipc.NewConfig()
config.WithTimeout(3*time.Second)

client, err := avroipc.NewClientWithConfig("localhost:20200", config)
if err != nil {
log.Fatal(err)
}

// Use the client as before
_ = client
}
```

## Development

Clone the repository and do the following sequence of command:
Expand Down
77 changes: 45 additions & 32 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import (
"github.com/myzhan/avroipc/transports"
)

// Client acts as an avro client
// An avro client implementation
type Client interface {
Close() error
Append(event *Event) (string, error)
Expand All @@ -25,58 +25,71 @@ type client struct {
handshakeProtocol protocols.HandshakeProtocol
}

// NewClient creates an avro Client, and connect to addr immediately
func NewClient(addr string, timeout, sendTimeout time.Duration, bufferSize, compressionLevel int) (Client, error) {
trans, err := transports.NewSocketTimeout(addr, timeout)
if err != nil {
return nil, err
}
if bufferSize > 0 {
trans = transports.NewBuffered(trans, bufferSize)
}
if compressionLevel > 0 {
trans, err = transports.NewZlib(trans, compressionLevel)
if err != nil {
return nil, err
}
}
err = trans.Open()
// NewClient creates an avro client with default option values and
// connects to the specified remote Flume endpoint immediately.
//
// Very useful for the testing purposes and to build simple examples.
func NewClient(addr string) (Client, error) {
return NewClientWithConfig(addr, NewConfig())
}

// NewClient creates an avro client with considering values of options from
// the passed configuration object and connects to the specified remote Flume
// endpoint immediately.
//
// This constructor supposed to be used in production environments.
func NewClientWithConfig(addr string, config *Config) (Client, error) {
c := &client{}
c.sendTimeout = config.SendTimeout

err := c.initTransports(addr, config)
if err != nil {
return nil, err
}

proto, err := protocols.NewAvroSource()
err = c.initProtocols()
if err != nil {
return nil, err
}

return NewClientWithTrans(trans, proto, sendTimeout)
return c, c.handshake()
}

func NewClientWithTrans(trans transports.Transport, proto protocols.MessageProtocol, sendTimeout time.Duration) (Client, error) {
var err error
c := &client{}
c.sendTimeout = sendTimeout

c.transport = trans
c.framingLayer = layers.NewFraming(trans)
func (c *client) initProtocols() error {
proto, err := protocols.NewAvroSource()
if err != nil {
return err
}

c.framingLayer = layers.NewFraming(c.transport)
c.callProtocol, err = protocols.NewCall(proto)
if err != nil {
return nil, err
return err
}

c.handshakeProtocol, err = protocols.NewHandshake()
if err != nil {
return nil, err
return err
}
return nil
}

err = c.handshake()
func (c *client) initTransports(addr string, config *Config) (err error) {
c.transport, err = transports.NewSocketTimeout(addr, config.Timeout)
if err != nil {
return nil, err
return err
}

if config.BufferSize > 0 {
c.transport = transports.NewBuffered(c.transport, config.BufferSize)
}
if config.CompressionLevel > 0 {
c.transport, err = transports.NewZlib(c.transport, config.CompressionLevel)
if err != nil {
return err
}
}

return c, nil
return c.transport.Open()
}

func (c *client) send(request []byte) ([]byte, error) {
Expand Down
7 changes: 6 additions & 1 deletion client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,12 @@ func TestClient_Append(t *testing.T) {
}
})

client, err := avroipc.NewClient(addr, 1*time.Second, 3*time.Second, d.buffer, d.level)
config := avroipc.NewConfig()
config.WithTimeout(time.Second)
config.WithSendTimeout(3 * time.Second)
config.WithBufferSize(d.buffer)
config.WithCompressionLevel(d.level)
client, err := avroipc.NewClientWithConfig(addr, config)
require.NoError(t, err)

event := &avroipc.Event{
Expand Down
78 changes: 78 additions & 0 deletions config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
package avroipc

import "time"

// Config provides a configuration for the client. Use the NewConfig method
// to create an instance of the Config and set all necessary parameters of
// the configuration.
type Config struct {
// Connection timeout for the built-in socket transport that limit time of
// connection to a Flume server.
//
// It is sane to always set this timeout before creating a new client
// instance because it will protect against hanging clients.
//
// Defaults to zero which means disabled connection timeout.
Timeout time.Duration

// Used to set read and write deadline of the built-in transports
// (actually, affects only the socket transport). It sets both deadlines
// together at the same time and there is no way to set them separately.
// These deadlines are used to limit execution time of reading and writing
// operations.
//
// This timeout is supposed to be always set to any appropriate for
// a particular situation value except maybe test and example
// configurations.
//
// Defaults to zero which means disabled read/write timeouts.
SendTimeout time.Duration

// A buffer size of the built-in buffered transport.
//
// Defaults to zero which means that the buffered transport won't be used.
BufferSize int
// A compression level of the built-in zlib transport.
//
// Defaults to zero which means that the compression will be disabled.
CompressionLevel int
}

// NewConfig returns a pointer to a new Config instance that is used to
// configure the client at a creation time. Invoking methods of the config
// instance may be chained with each other to specify all necessary config
// options in a single command. A NewConfig call may be also chained with
// other methods to inline config creations.
//
// config := NewConfig()
// config.WithTimeout(3*time.Second)
// client, err := NewClientWithConfig(config)
// or just
// client, err := NewClientWithConfig(NewConfig().WithTimeout(3*time.Second))
func NewConfig() *Config {
return &Config{}
}

// Sets the connection timeout.
func (c *Config) WithTimeout(t time.Duration) *Config {
c.Timeout = t
return c
}

// Sets the read/write timeouts together.
func (c *Config) WithSendTimeout(t time.Duration) *Config {
c.SendTimeout = t
return c
}

// Sets size of the internal buffer of the buffered transport.
func (c *Config) WithBufferSize(s int) *Config {
c.BufferSize = s
return c
}

// Sets the compression level of the zlib transport.
func (c *Config) WithCompressionLevel(l int) *Config {
c.CompressionLevel = l
return c
}
23 changes: 23 additions & 0 deletions config_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
package avroipc_test

import (
"testing"
"time"

"github.com/myzhan/avroipc"

"github.com/stretchr/testify/require"
)

func TestClient(t *testing.T) {
c := avroipc.NewConfig()
c.WithTimeout(1)
c.WithSendTimeout(2)
c.WithBufferSize(3)
c.WithCompressionLevel(4)

require.Equal(t, time.Duration(1), c.Timeout)
require.Equal(t, time.Duration(2), c.SendTimeout)
require.Equal(t, 3, c.BufferSize)
require.Equal(t, 4, c.CompressionLevel)
}
16 changes: 7 additions & 9 deletions ipc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,22 +17,20 @@ func TestSend(t *testing.T) {
if addr == "" {
t.Skip("The FLUME_SERVER_ADDRESS environment variable is not set")
}
config := NewConfig()

level := os.Getenv("FLUME_COMPRESSION_LEVEL")
levelInt := 0
if level != "" {
var err error
levelInt, err = strconv.Atoi(level)
l, err := strconv.Atoi(level)
require.NoError(t, err)
config.WithCompressionLevel(l)
}

bufferSize := 8
config.WithBufferSize(0)
config.WithTimeout(1 * time.Second)
config.WithSendTimeout(3 * time.Second)

timeout := time.Duration(0)
sendTimeout := time.Duration(0)

// flume avro instance address
client, err := NewClient(addr, timeout, sendTimeout, bufferSize, levelInt)
client, err := NewClientWithConfig(addr, config)
require.NoError(t, err)

event := &Event{
Expand Down

0 comments on commit ec69743

Please sign in to comment.