Skip to content

Commit

Permalink
Move Flume message protocol to the flume package
Browse files Browse the repository at this point in the history
closes #35

This change is moving all message protocol things to the Flume client and now all the code related to the custom clients based on the Avro client (like the Flume client) is encapsulated in the particular package (like the flume package).

It should allow building other clients based on the Avro client which may use other messages protocols.
  • Loading branch information
vykulakov committed Feb 2, 2020
1 parent cc16073 commit c73f2d3
Show file tree
Hide file tree
Showing 10 changed files with 301 additions and 269 deletions.
9 changes: 4 additions & 5 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ type client struct {
// endpoint immediately.
//
// This constructor supposed to be used in production environments.
func NewClientWithConfig(addr string, config *Config) (Client, error) {
func NewClientWithConfig(addr string, proto protocols.MessageProtocol, config *Config) (Client, error) {
c := &client{}
c.sendTimeout = config.SendTimeout

Expand All @@ -38,17 +38,16 @@ func NewClientWithConfig(addr string, config *Config) (Client, error) {
return nil, err
}

c.initProtocols()
c.initProtocols(proto)
return c, c.handshake()
}

func (c *client) initProtocols() {
func (c *client) initProtocols(proto protocols.MessageProtocol) {
// All errors here are only related to compilations of Avro schemas
// and are not possible at runtime because they will be caught by unit tests.
proto, _ := protocols.NewAvroSource()
c.framingLayer = layers.NewFraming(c.transport)
c.callProtocol, _ = protocols.NewCall(proto)
c.handshakeProtocol, _ = protocols.NewHandshake()
c.handshakeProtocol, _ = protocols.NewHandshake(proto)
}

func (c *client) initTransports(addr string, config *Config) (err error) {
Expand Down
6 changes: 5 additions & 1 deletion flume/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,11 @@ func NewClient(addr string) (Client, error) {
//
// This constructor supposed to be used in production environments.
func NewClientWithConfig(addr string, config *avroipc.Config) (Client, error) {
c, err := avroipc.NewClientWithConfig(addr, config)
// All errors here are only related to compilations of Avro schemas
// and are not possible at runtime because they will be caught by unit tests.
proto, _ := NewAvroSource()

c, err := avroipc.NewClientWithConfig(addr, proto, config)
if err != nil {
return nil, err
}
Expand Down
111 changes: 111 additions & 0 deletions flume/message.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
package flume

import (
"fmt"

"github.com/myzhan/avroipc/protocols"

"github.com/linkedin/goavro"
)

type message struct {
request *goavro.Codec
response *goavro.Codec
errors *goavro.Codec
}

// The Avro message protocol implementation for the Avro RPC protocol for using with the Avro Flume Source.
//
// It has used for preparing an outgoing message from input data and parsing a response message.
//
// The Avro Flume Source haven't documented well now.
type AvroSourceProtocol struct {
messages map[string]message
}

func NewAvroSource() (protocols.MessageProtocol, error) {
p := &AvroSourceProtocol{
messages: make(map[string]message),
}

err := p.init()
if err != nil {
return nil, err
}

return p, nil
}

func (p *AvroSourceProtocol) init() (err error) {
eventCodec, err := goavro.NewCodec(eventSchema)
if err != nil {
return
}
eventsCodec, err := goavro.NewCodec(eventsSchema)
if err != nil {
return
}
errorsCodec, err := goavro.NewCodec(errorsSchema)
if err != nil {
return
}
statusCodec, err := goavro.NewCodec(statusSchema)
if err != nil {
return
}
p.messages["append"] = message{eventCodec, statusCodec, errorsCodec}
p.messages["appendBatch"] = message{eventsCodec, statusCodec, errorsCodec}

return
}

func (p *AvroSourceProtocol) PrepareMessage(method string, datum interface{}) ([]byte, error) {
message, ok := p.messages[method]
if !ok {
return nil, fmt.Errorf("unknown method name: %s", method)
}

return message.request.BinaryFromNative(nil, datum)
}

func (p *AvroSourceProtocol) ParseMessage(method string, responseBytes []byte) (interface{}, []byte, error) {
message, ok := p.messages[method]
if !ok {
return nil, responseBytes, fmt.Errorf("unknown method name: %s", method)
}

return message.response.NativeFromBinary(responseBytes)
}

func (p *AvroSourceProtocol) ParseError(method string, responseBytes []byte) ([]byte, error) {
message, ok := p.messages[method]
if !ok {
return responseBytes, fmt.Errorf("unknown method name: %s", method)
}

response, responseBytes, err := message.errors.NativeFromBinary(responseBytes)
if err != nil {
return responseBytes, err
}

responseMap, ok := response.(map[string]interface{})
if !ok {
return responseBytes, fmt.Errorf("cannot convert error union to map: %v", response)
}

responseInt, ok := responseMap["string"]
if !ok {
return responseBytes, fmt.Errorf("string error not found in map: %v", responseMap)
}

responseStr, ok := responseInt.(string)
if !ok {
return responseBytes, fmt.Errorf("cannot convert string error to string: %v", responseInt)
}

return responseBytes, fmt.Errorf(responseStr)
}

func (p *AvroSourceProtocol) GetSchema() string {
return messageProtocol
}
14 changes: 7 additions & 7 deletions protocols/message_test.go → flume/message_test.go
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
package protocols_test
package flume_test

import (
"testing"

"github.com/stretchr/testify/require"
"github.com/myzhan/avroipc/flume"

"github.com/myzhan/avroipc/protocols"
"github.com/stretchr/testify/require"
)

func makeDatum(body string) interface{} {
Expand All @@ -25,12 +25,12 @@ func makeArrayDatum(bodies ...string) interface{} {

// Test successful schema compilation
func TestNewAvroSource(t *testing.T) {
_, err := protocols.NewAvroSource()
_, err := flume.NewAvroSource()
require.NoError(t, err)
}

func TestAvroSourceProtocol_PrepareMessage(t *testing.T) {
p, err := protocols.NewAvroSource()
p, err := flume.NewAvroSource()
require.NoError(t, err)

t.Run("bad method", func(t *testing.T) {
Expand Down Expand Up @@ -73,7 +73,7 @@ func TestAvroSourceProtocol_PrepareMessage(t *testing.T) {
}

func TestAvroSourceProtocol_ParseMessage(t *testing.T) {
p, err := protocols.NewAvroSource()
p, err := flume.NewAvroSource()
require.NoError(t, err)

t.Run("bad method", func(t *testing.T) {
Expand Down Expand Up @@ -103,7 +103,7 @@ func TestAvroSourceProtocol_ParseMessage(t *testing.T) {
}

func TestAvroSourceProtocol_ParseError(t *testing.T) {
p, err := protocols.NewAvroSource()
p, err := flume.NewAvroSource()
require.NoError(t, err)

t.Run("bad method", func(t *testing.T) {
Expand Down
123 changes: 123 additions & 0 deletions flume/schemas.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
package flume

const errorsSchema = `
{
"type": [
"string"
]
}
`

const eventSchema = `
{
"type": "record",
"name": "AvroFlumeEvent",
"fields": [
{
"name": "headers",
"type": {
"type": "map",
"values": "string"
}
},
{
"name": "body",
"type": "bytes"
}
]
}
`

const eventsSchema = `
{
"type": "array",
"items": {
"type": "record",
"name": "AvroFlumeEvent",
"fields": [
{
"name": "headers",
"type": {
"type": "map",
"values": "string"
}
},
{
"name": "body",
"type": "bytes"
}
]
}
}
`

const statusSchema = `
{
"type": "enum",
"name": "Status",
"symbols": [
"OK",
"FAILED",
"UNKNOWN"
]
}
`

const messageProtocol = `
{
"protocol": "AvroSourceProtocol",
"namespace": "org.apache.flume.source.avro",
"doc": "* Licensed to the Apache Software Foundation (ASF) under one\n * or more contributor license agreements. See the NOTICE file\n * distributed with this work for additional information\n * regarding copyright ownership. The ASF licenses this file\n * to you under the Apache License, Version 2.0 (the\n * \"License\"); you may not use this file except in compliance\n * with the License. You may obtain a copy of the License at\n *\n * http:https://www.apache.org/licenses/LICENSE-2.0\n *\n * Unless required by applicable law or agreed to in writing,\n * software distributed under the License is distributed on an\n * \"AS IS\" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY\n * KIND, either express or implied. See the License for the\n * specific language governing permissions and limitations\n * under the License.",
"types": [
{
"type": "enum",
"name": "Status",
"symbols": [
"OK",
"FAILED",
"UNKNOWN"
]
},
{
"type": "record",
"name": "AvroFlumeEvent",
"fields": [
{
"name": "headers",
"type": {
"type": "map",
"values": "string"
}
},
{
"name": "body",
"type": "bytes"
}
]
}
],
"messages": {
"append": {
"request": [
{
"name": "event",
"type": "AvroFlumeEvent"
}
],
"response": "Status"
},
"appendBatch": {
"request": [
{
"name": "events",
"type": {
"type": "array",
"items": "AvroFlumeEvent"
}
}
],
"response": "Status"
}
}
}
`
5 changes: 5 additions & 0 deletions mocks/message_protocol.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,3 +20,8 @@ func (p *MockProtocol) ParseError(method string, responseBytes []byte) ([]byte,
args := p.Called(method, responseBytes)
return args.Get(0).([]byte), args.Error(1)
}

func (p *MockProtocol) GetSchema() string {
args := p.Called()
return args.String(0)
}
12 changes: 8 additions & 4 deletions protocols/handshake.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ type HandshakeProtocol interface {
type handshakeProtocol struct {
logger *logrus.Entry

proto MessageProtocol

serverHash []byte
clientHash []byte
clientProtocol string
Expand All @@ -37,11 +39,13 @@ type handshakeProtocol struct {
handshakeResponseCodec *goavro.Codec
}

func NewHandshake() (HandshakeProtocol, error) {
func NewHandshake(proto MessageProtocol) (HandshakeProtocol, error) {
m := proto.GetSchema()
p := &handshakeProtocol{
serverHash: getMD5(messageProtocol),
clientHash: getMD5(messageProtocol),
clientProtocol: messageProtocol,
proto: proto,
serverHash: getMD5(m),
clientHash: getMD5(m),
clientProtocol: m,
}

p.logger = logrus.WithFields(logrus.Fields{
Expand Down
Loading

0 comments on commit c73f2d3

Please sign in to comment.