Skip to content

Commit

Permalink
manager: initial commit
Browse files Browse the repository at this point in the history
Signed-off-by: Sander Pick <[email protected]>
  • Loading branch information
sanderpick committed Nov 21, 2019
1 parent 93b4c77 commit 5bbb4ba
Show file tree
Hide file tree
Showing 10 changed files with 223 additions and 21 deletions.
18 changes: 9 additions & 9 deletions api/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,15 +30,15 @@ type Server struct {
cancel context.CancelFunc
}

type Options struct {
type Config struct {
Addr string // defaults to 0.0.0.0:9090
ProxyAddr string // defaults to 0.0.0.0:9091
Debug bool
}

func NewServer(ctx context.Context, ds ds.Datastore, ts tserv.Threadservice, opts Options) (*Server, error) {
func NewServer(ctx context.Context, ds ds.Datastore, ts tserv.Threadservice, conf Config) (*Server, error) {
var err error
if opts.Debug {
if conf.Debug {
err = util.SetLogLevels(map[string]logger.Level{
"api": logger.DEBUG,
})
Expand All @@ -59,10 +59,10 @@ func NewServer(ctx context.Context, ds ds.Datastore, ts tserv.Threadservice, opt
cancel: cancel,
}

if opts.Addr == "" {
opts.Addr = "0.0.0.0:9090"
if conf.Addr == "" {
conf.Addr = "0.0.0.0:9090"
}
listener, err := net.Listen("tcp", opts.Addr)
listener, err := net.Listen("tcp", conf.Addr)
if err != nil {
return nil, err
}
Expand All @@ -80,11 +80,11 @@ func NewServer(ctx context.Context, ds ds.Datastore, ts tserv.Threadservice, opt
grpcweb.WithWebsocketOriginFunc(func(req *http.Request) bool {
return true
}))
if opts.ProxyAddr == "" {
opts.ProxyAddr = "0.0.0.0:9091"
if conf.ProxyAddr == "" {
conf.ProxyAddr = "0.0.0.0:9091"
}
s.proxy = &http.Server{
Addr: opts.ProxyAddr,
Addr: conf.ProxyAddr,
}
s.proxy.Handler = http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
if webrpc.IsGrpcWebRequest(r) ||
Expand Down
2 changes: 1 addition & 1 deletion api/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ type service struct {
datastore ds.Datastore
threads tserv.Threadservice

stores map[string]*es.Store
storeManager
}

func (s *service) NewStore(ctx context.Context, req *pb.NewStoreRequest) (*pb.NewStoreReply, error) {
Expand Down
162 changes: 162 additions & 0 deletions eventstore/manager.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,162 @@
package eventstore

import (
"context"
"fmt"
"io"

"github.com/google/uuid"
ds "github.com/ipfs/go-datastore"
kt "github.com/ipfs/go-datastore/keytransform"
"github.com/ipfs/go-datastore/query"
)

var (
dsStoreManagerBaseKey = ds.NewKey("/manager")
)

/*
NewManager: hydrate in-mem stores from prefixes and start them
NewStore: create a new store and prefix its datastore with base key, add to in-mem map
GetStore: return store from in-mem map as it might be started
Close: close all the in-mem stores
*/

type Manager struct {
io.Closer

ctx context.Context
cancel context.CancelFunc

config *StoreConfig

stores map[uuid.UUID]*Store

ownThreadService bool
}

func NewManager(opts ...StoreOption) (*Manager, error) {
config := &StoreConfig{}
for _, opt := range opts {
if err := opt(config); err != nil {
return nil, err
}
}

if config.Datastore == nil {
datastore, err := newDefaultDatastore(config.RepoPath)
if err != nil {
return nil, err
}
config.Datastore = datastore
}

// @todo: handle debug here or in store?

ctx, cancel := context.WithCancel(context.Background())
ownThreadService := false
if config.Threadservice == nil {
ownThreadService = true
s, err := newDefaultThreadservice(ctx, config.ListenPort, config.RepoPath, config.Debug)
if err != nil {
cancel()
return nil, err
}
config.Threadservice = s
}

m := &Manager{
ctx: ctx,
cancel: cancel,
config: config,
stores: make(map[uuid.UUID]*Store),
ownThreadService: ownThreadService,
}

q, err := m.config.Datastore.Query(query.Query{
Prefix: dsStoreManagerBaseKey.String(),
KeysOnly: true,
})
if err != nil {
return nil, err
}
defer q.Close()

for res := range q.Next() {
id := ds.RawKey(res.Key).Parent().Name()
fmt.Println(id)

//s, err := NewStore(m.opts...)
//if err != nil {
// return nil, err
//}

//store.datastore = kt.Wrap(store.datastore, kt.PrefixTransform{
// Prefix: res.Key,
//})
//
//m.stores[id] = store
}

return m, nil
}

func (m *Manager) NewStore() (id uuid.UUID, store *Store, err error) {
store, err = newStore(m.config)
if err != nil {
return
}

id, err = uuid.NewRandom()
if err != nil {
return
}
store.datastore = kt.Wrap(store.datastore, kt.PrefixTransform{
Prefix: dsStoreManagerBaseKey.ChildString(id.String()),
})

m.stores[id] = store
return id, store, nil
}

func (m *Manager) GetStore(id uuid.UUID) (*Store, error) {
prefix := dsStoreManagerBaseKey.ChildString(id.String())
q, err := m.config.Datastore.Query(query.Query{
Prefix: prefix.String(),
KeysOnly: true,
})
if err != nil {
return nil, err
}
defer q.Close()

if _, ok := q.NextSync(); !ok {
return nil, nil // not found
}

store, err := newStore(m.config)
if err != nil {
return nil, err
}

store.datastore = kt.Wrap(store.datastore, kt.PrefixTransform{
Prefix: prefix,
})

m.stores[id] = store
return store, nil
}

func (m *Manager) Close() (err error) {
m.cancel()
for _, s := range m.stores {
s.Close()
}
if err = m.config.Datastore.Close(); err != nil {
return
}
if m.ownThreadService {
m.config.Threadservice.Close()
}
return
}
36 changes: 36 additions & 0 deletions eventstore/manager_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
package eventstore

import (
"io/ioutil"
"os"
"testing"
)

func TestManager_NewStore(t *testing.T) {
t.Parallel()
t.Run("Single", func(t *testing.T) {
t.Parallel()
man, clean := createTestManager(t)
defer clean()
id, _, err := man.NewStore()
checkErr(t, err)
log.Debugf("added store %s", id.String())
})
t.Run("Multiple", func(t *testing.T) {
t.Parallel()
man, clean := createTestManager(t)
defer clean()
_, _, err := man.NewStore()
checkErr(t, err)
_, _, err = man.NewStore()
checkErr(t, err)
})
}

func createTestManager(t *testing.T) (*Manager, func()) {
dir, err := ioutil.TempDir("", "")
checkErr(t, err)
m, err := NewManager(WithRepoPath(dir))
checkErr(t, err)
return m, func() { _ = os.RemoveAll(dir) }
}
2 changes: 1 addition & 1 deletion eventstore/model.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ func (m *Model) Save(vs ...interface{}) error {
// Has returns true if all IDs exist in the model, false
// otherwise.
func (m *Model) Has(ids ...core.EntityID) (exists bool, err error) {
m.ReadTxn(func(txn *Txn) error {
_ = m.ReadTxn(func(txn *Txn) error {
exists, err = txn.Has(ids...)
return err
})
Expand Down
4 changes: 4 additions & 0 deletions eventstore/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,10 @@ func NewStore(ts threadservice.Threadservice, opts ...StoreOption) (*Store, erro
return nil, err
}
}
return newStore(ts, config)
}

func newStore(ts threadservice.Threadservice, config *StoreConfig) (*Store, error) {
if config.Datastore == nil {
datastore, err := newDefaultDatastore(config.RepoPath)
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion exe/daemon/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ func main() {
defer ts.Close()
defer ds.Close()

server, err := api.NewServer(context.Background(), ds, ts, api.Options{
server, err := api.NewServer(context.Background(), ds, ts, api.Config{
Debug: true,
})
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion exe/util/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ func Build(repo string, port int, proxyAddr string, debug bool) (

// Build a threadservice
util.SetupDefaultLoggingConfig(repop)
api, err = t.NewThreads(ctx, h, lite.BlockStore(), lite, tstore, t.Options{
api, err = t.NewThreads(ctx, h, lite.BlockStore(), lite, tstore, t.Config{
ProxyAddr: proxyAddr,
Debug: debug,
})
Expand Down
2 changes: 1 addition & 1 deletion test/threads_suite.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ func newService(t *testing.T, listen ma.Multiaddr, proxyAddr string) tserv.Threa
bsrv.Blockstore(),
dag.NewDAGService(bsrv),
tstore.NewThreadstore(),
threads.Options{ProxyAddr: proxyAddr, Debug: true})
threads.Config{ProxyAddr: proxyAddr, Debug: true})
check(t, err)
return ts
}
Expand Down
14 changes: 7 additions & 7 deletions threads.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,8 +68,8 @@ type threads struct {
pullLocks map[thread.ID]chan struct{}
}

// Options is used to specify thread instance options.
type Options struct {
// Config is used to specify thread instance options.
type Config struct {
ProxyAddr string // defaults to 0.0.0.0:5050
Debug bool
}
Expand All @@ -81,10 +81,10 @@ func NewThreads(
bstore bs.Blockstore,
ds format.DAGService,
ts tstore.Threadstore,
opts Options,
conf Config,
) (tserv.Threadservice, error) {
var err error
if opts.Debug {
if conf.Debug {
err = util.SetLogLevels(map[string]logger.Level{
"threads": logger.DEBUG,
"threadstore": logger.DEBUG,
Expand Down Expand Up @@ -122,11 +122,11 @@ func NewThreads(

// Start a web RPC proxy
webrpc := grpcweb.WrapServer(t.rpc)
if opts.ProxyAddr == "" {
opts.ProxyAddr = "0.0.0.0:5050"
if conf.ProxyAddr == "" {
conf.ProxyAddr = "0.0.0.0:5050"
}
t.proxy = &http.Server{
Addr: opts.ProxyAddr,
Addr: conf.ProxyAddr,
}
t.proxy.Handler = http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
if webrpc.IsGrpcWebRequest(r) {
Expand Down

0 comments on commit 5bbb4ba

Please sign in to comment.