Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: protocol v2 #1

Merged
merged 6 commits into from
Mar 4, 2021
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
flesh out protocol
  • Loading branch information
b1naryth1ef committed Feb 23, 2021
commit 0060676346325bc83bca05fcdd34bdcabe4db1d8
13 changes: 9 additions & 4 deletions bristle.proto
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,10 @@ message WriteBatchResponse {
}

message StreamingClientMessageWriteBatch {
string type = 1;
bytes data = 2;
uint32 id = 1;
string type = 2;
uint32 size = 3;
bytes data = 4;
}

enum BatchResult {
Expand All @@ -54,15 +56,18 @@ message StreamingServerMessageTypeInfo {
uint32 max_batch_size = 3;
}

message StreamingServerMessageWriteBatchResult { BatchResult result = 1; }
message StreamingServerMessageWriteBatchResult {
uint32 id = 1;
BatchResult result = 2;
}

message StreamingClientMessageUpdateDefault {
string type = 1;
bytes default = 2;
}

message StreamingServerMessageBackoff {
uint32 duration = 1;
uint64 until = 1;
repeated string types = 2;
}

Expand Down
14 changes: 0 additions & 14 deletions client.go

This file was deleted.

103 changes: 103 additions & 0 deletions client/batcher.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
package client

import (
"context"
"errors"
"sync"
"time"

"github.com/rs/zerolog/log"
"google.golang.org/protobuf/proto"
)

type BristleBatcherConfig struct {
// Maximum number of messages to allow being buffered in memory
BufferSize int

// Whether writing to the upstream will perform retries on error, or drop
// payloads
Retry bool

// Interval on which we will attempt to flush to the upstream
FlushInterval time.Duration
}

type BristleBatcher struct {
sync.Mutex

client *BristleClient
config BristleBatcherConfig
batches map[string][]proto.Message
}

func NewBristleBatcher(client *BristleClient, config BristleBatcherConfig) *BristleBatcher {
return &BristleBatcher{
client: client,
config: config,
batches: make(map[string][]proto.Message),
}
}

var ErrBatchOversized = errors.New("batcher cannot handle oversized batch")
var ErrBufferFull = errors.New("batcher buffer is full")

func (b *BristleBatcher) Run(ctx context.Context) error {
ticker := time.NewTicker(b.config.FlushInterval)
for {
select {
case <-ticker.C:
b.flush()
case <-ctx.Done():
return nil
}
}
}

func (b *BristleBatcher) flush() {
b.Lock()
batches := b.batches
b.batches = make(map[string][]proto.Message)
b.Unlock()

retryTimes := 0
if b.config.Retry {
retryTimes = -1
}

for messageTypeName, batch := range batches {
err := b.client.WriteBatchSync(messageTypeName, batch, retryTimes)
if err != nil {
log.Error().
Err(err).
Str("type", messageTypeName).
Int("batch-size", len(batch)).
Msg("bristle-batcher: error on writing batch to upstream")
}
}
}

func (b *BristleBatcher) WriteBatch(messageType string, messages []proto.Message) error {
b.Lock()
defer b.Unlock()

newBatchSize := len(messages)

if newBatchSize > b.config.BufferSize {
return ErrBatchOversized
}

batch, ok := b.batches[messageType]
if !ok {
b.batches[messageType] = messages
return nil
}

existingBatchSize := len(batch)

if existingBatchSize+newBatchSize > b.config.BufferSize {
return ErrBatchOversized
}

b.batches[messageType] = append(batch, messages...)
return nil
}
173 changes: 173 additions & 0 deletions client/client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,173 @@
package client

import (
"context"
"errors"
"fmt"
"io"
"net/url"
"sync"
"sync/atomic"
"time"

"github.com/rs/zerolog/log"
v1 "github.com/uplol/bristle/proto/v1"
"google.golang.org/grpc"
"google.golang.org/protobuf/encoding/protowire"
"google.golang.org/protobuf/proto"
)

type BristleClient struct {
dsn *url.URL
client v1.BristleIngestService_StreamingClient

// Locked when we are backing off and no batches should be sent
backoffUntil uint64
writerLock sync.Mutex
idInc uint32

// Batch results
results chan *v1.StreamingServerMessageWriteBatchResult

outgoing chan *v1.StreamingClientMessage
}

func NewBristleClient(dsn *url.URL) *BristleClient {
return &BristleClient{
dsn: dsn,
backoffUntil: 0,
idInc: 0,
results: make(chan *v1.StreamingServerMessageWriteBatchResult),
outgoing: make(chan *v1.StreamingClientMessage, 8),
}
}

func (b *BristleClient) Run(ctx context.Context) {
for {
err := b.runClient(ctx)
if ctx.Done() != nil && err != nil {
log.Error().Err(err).Msg("bristle-client: runClient encountered error, retrying in 5 seconds")
time.Sleep(time.Second * 5)
continue
}
return
}
}

func (b *BristleClient) runClient(ctx context.Context) error {
conn, err := grpc.Dial(b.dsn.Host, grpc.WithInsecure())
if err != nil {
return err
}
defer conn.Close()

client := v1.NewBristleIngestServiceClient(conn)

stream, err := client.Streaming(ctx)
if err != nil {
return err
}

go func() {
for {
message, ok := <-b.outgoing
if !ok {
return
}

err := stream.Send(message)
if err == io.EOF {
return
} else if err != nil {
log.Error().Err(err).Msg("bristle-client: encountered error writing")
conn.Close()
return
}
}
}()

for {
in, err := stream.Recv()
if err == io.EOF {
return nil
} else if err != nil {
return err
}

switch v := in.Inner.(type) {
case *v1.StreamingServerMessage_WriteBatchResult:
log.Trace().
Int("batch", int(v.WriteBatchResult.Id)).
Int("result", int(v.WriteBatchResult.Result)).
Msg("bristle-client: batch result received")
b.results <- v.WriteBatchResult
case *v1.StreamingServerMessage_Backoff:
log.Trace().
Int("until", int(v.Backoff.Until)).
Interface("types", v.Backoff.Types).
Msg("bristle-client: backoff request received")
if v.Backoff.Until > b.backoffUntil {
atomic.StoreUint64(&b.backoffUntil, v.Backoff.Until)
}
}
}
}

var BatchTooBig = errors.New("batch was too big for the upstream to handle")

func (b *BristleClient) WriteBatchSync(messageType string, messages []proto.Message, retryTimes int) error {
b.writerLock.Lock()
defer b.writerLock.Unlock()

// Serialize the type
data := []byte{}

for _, message := range messages {
messageData, err := proto.Marshal(message)
if err != nil {
return err
}

data = protowire.AppendBytes(data, messageData)
}

for {
now := uint64(time.Now().UnixNano() / int64(time.Millisecond))
if b.backoffUntil > now {
log.Trace().Int("until", int(b.backoffUntil)).Int("now", int(now)).Msg("bristle-client: backing off")
time.Sleep(time.Millisecond * time.Duration(now-b.backoffUntil))
continue
}

b.idInc += 1
b.outgoing <- &v1.StreamingClientMessage{
Inner: &v1.StreamingClientMessage_WriteBatch{
WriteBatch: &v1.StreamingClientMessageWriteBatch{
Id: b.idInc,
Type: messageType,
Size: uint32(len(messages)),
Data: data,
},
},
}

result := <-b.results
if result.Result == v1.BatchResult_OK {
return nil
}

// We can't retry this
if result.Result == v1.BatchResult_TOO_BIG {
return BatchTooBig
}

if retryTimes > 0 {
retryTimes -= 1
continue
} else if retryTimes == -1 {
continue
}

return fmt.Errorf("WriteBatchSync failed, result was %v", result.Result)
}
}
Loading