Skip to content

Commit

Permalink
Implement listener client and test
Browse files Browse the repository at this point in the history
Signed-off-by: Aaron Sutula <[email protected]>
  • Loading branch information
asutula committed Dec 17, 2019
1 parent acb061d commit 6abcf96
Show file tree
Hide file tree
Showing 5 changed files with 338 additions and 132 deletions.
74 changes: 57 additions & 17 deletions api/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"github.com/mr-tron/base58"
ma "github.com/multiformats/go-multiaddr"
"github.com/textileio/go-textile-core/crypto/symmetric"
"github.com/textileio/go-textile-core/store"
pb "github.com/textileio/go-textile-threads/api/pb"
es "github.com/textileio/go-textile-threads/eventstore"
"google.golang.org/grpc"
Expand All @@ -26,8 +27,8 @@ type Client struct {

// ListenEvent is used to send data or error values for Listen
type ListenEvent struct {
data []byte
err error
action es.Action
err error
}

// NewClient starts the client
Expand Down Expand Up @@ -231,22 +232,45 @@ func (c *Client) WriteTransaction(storeID, modelName string) (*WriteTransaction,
return &WriteTransaction{client: client, storeID: storeID, modelName: modelName}, nil
}

// Listen provides an update whenever the specified model is updated
func (c *Client) Listen(storeID, modelName, entityID string) (<-chan ListenEvent, func()) {
// Listen provides an update whenever the specified store, model type, or model instance is updated
func (c *Client) Listen(storeID string, listenOptions ...es.ListenOption) (<-chan ListenEvent, func(), error) {
channel := make(chan ListenEvent)
ctx, cancel := context.WithCancel(c.ctx)
go func() {
defer close(channel)
req := &pb.ListenRequest{
StoreID: storeID,
ModelName: modelName,
EntityID: entityID,
filters := make([]*pb.ListenRequest_Filter, len(listenOptions))
for i, listenOption := range listenOptions {
var action pb.ListenRequest_Filter_Action
switch listenOption.Type {
case es.ListenAll:
action = pb.ListenRequest_Filter_ALL
case es.ListenCreate:
action = pb.ListenRequest_Filter_CREATE
case es.ListenDelete:
action = pb.ListenRequest_Filter_DELETE
case es.ListenSave:
action = pb.ListenRequest_Filter_SAVE
default:
cancel()
return nil, nil, fmt.Errorf("unknown ListenOption.Type %v", listenOption.Type)
}
stream, err := c.client.Listen(ctx, req)
if err != nil {
channel <- ListenEvent{err: err}
return
filters[i] = &pb.ListenRequest_Filter{
ModelName: listenOption.Model,
EntityID: listenOption.ID.String(),
Action: action,
}
}
req := &pb.ListenRequest{
StoreID: storeID,
Filters: filters,
}
stream, err := c.client.Listen(ctx, req)
if err != nil {
cancel()
return nil, nil, err
}
go func() {
defer close(channel)

L:
for {
event, err := stream.Recv()
if err != nil {
Expand All @@ -256,11 +280,27 @@ func (c *Client) Listen(storeID, modelName, entityID string) (<-chan ListenEvent
}
break
}
bytes := []byte(event.GetEntity())
channel <- ListenEvent{data: bytes}
var actionType es.ActionType
switch event.GetAction() {
case pb.ListenReply_CREATE:
actionType = es.ActionCreate
case pb.ListenReply_DELETE:
actionType = es.ActionDelete
case pb.ListenReply_SAVE:
actionType = es.ActionSave
default:
channel <- ListenEvent{err: fmt.Errorf("unknown listen reply action %v", event.GetAction())}
break L
}
action := es.Action{
Model: event.GetModelName(),
ID: store.EntityID(event.GetEntityID()),
Type: actionType,
}
channel <- ListenEvent{action: action}
}
}()
return channel, cancel
return channel, cancel, nil
}

func processFindReply(reply *pb.ModelFindReply, dummySlice interface{}) (interface{}, error) {
Expand Down
41 changes: 27 additions & 14 deletions api/client/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package client

import (
"context"
"encoding/json"
"fmt"
"io/ioutil"
"os"
Expand All @@ -11,6 +10,7 @@ import (
"time"

ma "github.com/multiformats/go-multiaddr"
"github.com/textileio/go-textile-core/store"
"github.com/textileio/go-textile-threads/api"
es "github.com/textileio/go-textile-threads/eventstore"
"github.com/textileio/go-textile-threads/util"
Expand Down Expand Up @@ -395,8 +395,15 @@ func TestListen(t *testing.T) {
err = client.ModelCreate(storeID, modelName, person)
checkErr(t, err)

channel, discard := client.Listen(storeID, modelName, person.ID)
opt := es.ListenOption{
Model: modelName,
ID: store.EntityID(person.ID),
}
channel, discard, err := client.Listen(storeID, opt)
defer discard()
if err != nil {
t.Fatalf("failed to call listen: %v", err)
}

go func() {
time.Sleep(1 * time.Second)
Expand All @@ -413,12 +420,15 @@ func TestListen(t *testing.T) {
if val.err != nil {
t.Fatalf("failed to receive first listen result: %v", val.err)
}
p := &Person{}
if err := json.Unmarshal(val.data, p); err != nil {
t.Fatalf("failed to unmarshal listen result: %v", err)
}
if p.Age != 30 {
t.Fatalf("expected listen result age = 30 but got: %v", p.Age)
// p := &Person{}
// if err := json.Unmarshal(val.data, p); err != nil {
// t.Fatalf("failed to unmarshal listen result: %v", err)
// }
// if p.Age != 30 {
// t.Fatalf("expected listen result age = 30 but got: %v", p.Age)
// }
if val.action.ID.String() != person.ID {
t.Fatalf("expected listen result id = %v but got: %v", person.ID, val.action.ID.String())
}
}

Expand All @@ -429,12 +439,15 @@ func TestListen(t *testing.T) {
if val.err != nil {
t.Fatalf("failed to receive second listen result: %v", val.err)
}
p := &Person{}
if err := json.Unmarshal(val.data, p); err != nil {
t.Fatalf("failed to unmarshal listen result: %v", err)
}
if p.Age != 40 {
t.Fatalf("expected listen result age = 40 but got: %v", p.Age)
// p := &Person{}
// if err := json.Unmarshal(val.data, p); err != nil {
// t.Fatalf("failed to unmarshal listen result: %v", err)
// }
// if p.Age != 40 {
// t.Fatalf("expected listen result age = 40 but got: %v", p.Age)
// }
if val.action.ID.String() != person.ID {
t.Fatalf("expected listen result id = %v but got: %v", person.ID, val.action.ID.String())
}
}
}
Expand Down
Loading

0 comments on commit 6abcf96

Please sign in to comment.