From 328239563097970e722a4816f00021b2f53ee51f Mon Sep 17 00:00:00 2001 From: Michael Stergianis Date: Tue, 2 Mar 2021 15:35:43 -0500 Subject: [PATCH 1/9] Separate data streaming from websockets Signed-off-by: Michael Stergianis --- internal/api/api.go | 16 +- internal/api/api_test.go | 5 +- internal/api/fake/mock_client_factory.go | 72 ++++++++ internal/api/fake/mock_client_manager.go | 12 +- internal/api/fake/mock_streaming_client.go | 121 +++++++++++++ internal/api/streaming_connection_manager.go | 171 +++++++++++++++++++ internal/api/streaming_service.go | 44 +++++ internal/api/websocket_client.go | 71 +++++--- internal/api/websocket_client_manager.go | 155 ----------------- internal/api/websocket_connection_factory.go | 69 ++++++++ internal/api/websocket_service.go | 68 -------- internal/api/websocket_service_test.go | 6 +- internal/api/websocket_state.go | 5 +- pkg/dash/dash.go | 44 +++-- vendor/modules.txt | 3 +- 15 files changed, 575 insertions(+), 287 deletions(-) create mode 100644 internal/api/fake/mock_client_factory.go create mode 100644 internal/api/fake/mock_streaming_client.go create mode 100644 internal/api/streaming_connection_manager.go create mode 100644 internal/api/streaming_service.go delete mode 100644 internal/api/websocket_client_manager.go create mode 100644 internal/api/websocket_connection_factory.go delete mode 100644 internal/api/websocket_service.go diff --git a/internal/api/api.go b/internal/api/api.go index 32a67d22be..e45b144ba1 100644 --- a/internal/api/api.go +++ b/internal/api/api.go @@ -121,7 +121,7 @@ type API struct { prefix string dashConfig config.Dash logger log.Logger - wsClientManager *WebsocketClientManager + scManager *StreamingConnectionManager modulePaths map[string]module.Module modules []module.Module @@ -131,7 +131,7 @@ type API struct { var _ Service = (*API)(nil) // New creates an instance of API. -func New(ctx context.Context, prefix string, actionDispatcher ActionDispatcher, websocketClientManager *WebsocketClientManager, dashConfig config.Dash) *API { +func New(ctx context.Context, prefix string, actionDispatcher ActionDispatcher, streamingConnectionManager *StreamingConnectionManager, dashConfig config.Dash) *API { logger := dashConfig.Logger().With("component", "api") return &API{ ctx: ctx, @@ -141,7 +141,7 @@ func New(ctx context.Context, prefix string, actionDispatcher ActionDispatcher, dashConfig: dashConfig, logger: logger, forceUpdateCh: make(chan bool, 1), - wsClientManager: websocketClientManager, + scManager: streamingConnectionManager, } } @@ -160,7 +160,7 @@ func (a *API) Handler(ctx context.Context) (http.Handler, error) { s := router.PathPrefix(a.prefix).Subrouter() - s.Handle("/stream", websocketService(a.wsClientManager, a.dashConfig)) + s.Handle("/stream", streamService(a.scManager, a.dashConfig)) s.NotFoundHandler = http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { a.logger.Errorf("api handler not found: %s", r.URL.String()) @@ -177,7 +177,7 @@ type LoadingAPI struct { actionDispatcher ActionDispatcher prefix string logger log.Logger - wsClientManager *WebsocketClientManager + scManager *StreamingConnectionManager modulePaths map[string]module.Module modules []module.Module @@ -187,7 +187,7 @@ type LoadingAPI struct { var _ Service = (*LoadingAPI)(nil) // NewLoadingAPI creates an instance of LoadingAPI -func NewLoadingAPI(ctx context.Context, prefix string, actionDispatcher ActionDispatcher, websocketClientManager *WebsocketClientManager, logger log.Logger) *LoadingAPI { +func NewLoadingAPI(ctx context.Context, prefix string, actionDispatcher ActionDispatcher, websocketClientManager *StreamingConnectionManager, logger log.Logger) *LoadingAPI { logger = logger.With("component", "loading api") return &LoadingAPI{ ctx: ctx, @@ -196,7 +196,7 @@ func NewLoadingAPI(ctx context.Context, prefix string, actionDispatcher ActionDi modulePaths: make(map[string]module.Module), logger: logger, forceUpdateCh: make(chan bool, 1), - wsClientManager: websocketClientManager, + scManager: websocketClientManager, } } @@ -212,7 +212,7 @@ func (l *LoadingAPI) Handler(ctx context.Context) (http.Handler, error) { s := router.PathPrefix(l.prefix).Subrouter() - s.Handle("/stream", loadingWebsocketService(l.wsClientManager)) + s.Handle("/stream", loadingStreamService(l.scManager)) return router, nil } diff --git a/internal/api/api_test.go b/internal/api/api_test.go index f3ec31fc39..f0c323fa57 100644 --- a/internal/api/api_test.go +++ b/internal/api/api_test.go @@ -95,11 +95,12 @@ func TestAPI_routes(t *testing.T) { AnyTimes() actionDispatcher := apiFake.NewMockActionDispatcher(controller) + streamingClientFactory := apiFake.NewMockStreamingClientFactory(controller) ctx := context.Background() - wsClientManager := api.NewWebsocketClientManager(ctx, actionDispatcher) + scManager := api.NewStreamingConnectionManager(ctx, actionDispatcher, streamingClientFactory) - srv := api.New(ctx, "/", actionDispatcher, wsClientManager, dashConfig) + srv := api.New(ctx, "/", actionDispatcher, scManager, dashConfig) handler, err := srv.Handler(ctx) require.NoError(t, err) diff --git a/internal/api/fake/mock_client_factory.go b/internal/api/fake/mock_client_factory.go new file mode 100644 index 0000000000..418811dc72 --- /dev/null +++ b/internal/api/fake/mock_client_factory.go @@ -0,0 +1,72 @@ +// Code generated by MockGen. DO NOT EDIT. +// Source: github.com/vmware-tanzu/octant/internal/api (interfaces: StreamingClientFactory) + +// Package fake is a generated GoMock package. +package fake + +import ( + context "context" + http "net/http" + reflect "reflect" + + gomock "github.com/golang/mock/gomock" + uuid "github.com/google/uuid" + + api "github.com/vmware-tanzu/octant/internal/api" + config "github.com/vmware-tanzu/octant/internal/config" +) + +// MockStreamingClientFactory is a mock of StreamingClientFactory interface +type MockStreamingClientFactory struct { + ctrl *gomock.Controller + recorder *MockStreamingClientFactoryMockRecorder +} + +// MockStreamingClientFactoryMockRecorder is the mock recorder for MockStreamingClientFactory +type MockStreamingClientFactoryMockRecorder struct { + mock *MockStreamingClientFactory +} + +// NewMockStreamingClientFactory creates a new mock instance +func NewMockStreamingClientFactory(ctrl *gomock.Controller) *MockStreamingClientFactory { + mock := &MockStreamingClientFactory{ctrl: ctrl} + mock.recorder = &MockStreamingClientFactoryMockRecorder{mock} + return mock +} + +// EXPECT returns an object that allows the caller to indicate expected use +func (m *MockStreamingClientFactory) EXPECT() *MockStreamingClientFactoryMockRecorder { + return m.recorder +} + +// NewConnection mocks base method +func (m *MockStreamingClientFactory) NewConnection(arg0 uuid.UUID, arg1 http.ResponseWriter, arg2 *http.Request, arg3 *api.StreamingConnectionManager, arg4 config.Dash) (api.StreamingClient, context.CancelFunc, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "NewConnection", arg0, arg1, arg2, arg3, arg4) + ret0, _ := ret[0].(api.StreamingClient) + ret1, _ := ret[1].(context.CancelFunc) + ret2, _ := ret[2].(error) + return ret0, ret1, ret2 +} + +// NewConnection indicates an expected call of NewConnection +func (mr *MockStreamingClientFactoryMockRecorder) NewConnection(arg0, arg1, arg2, arg3, arg4 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "NewConnection", reflect.TypeOf((*MockStreamingClientFactory)(nil).NewConnection), arg0, arg1, arg2, arg3, arg4) +} + +// NewTemporaryConnection mocks base method +func (m *MockStreamingClientFactory) NewTemporaryConnection(arg0 uuid.UUID, arg1 http.ResponseWriter, arg2 *http.Request, arg3 *api.StreamingConnectionManager) (api.StreamingClient, context.CancelFunc, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "NewTemporaryConnection", arg0, arg1, arg2, arg3) + ret0, _ := ret[0].(api.StreamingClient) + ret1, _ := ret[1].(context.CancelFunc) + ret2, _ := ret[2].(error) + return ret0, ret1, ret2 +} + +// NewTemporaryConnection indicates an expected call of NewTemporaryConnection +func (mr *MockStreamingClientFactoryMockRecorder) NewTemporaryConnection(arg0, arg1, arg2, arg3 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "NewTemporaryConnection", reflect.TypeOf((*MockStreamingClientFactory)(nil).NewTemporaryConnection), arg0, arg1, arg2, arg3) +} diff --git a/internal/api/fake/mock_client_manager.go b/internal/api/fake/mock_client_manager.go index 0a87157321..8cdb672040 100644 --- a/internal/api/fake/mock_client_manager.go +++ b/internal/api/fake/mock_client_manager.go @@ -40,10 +40,10 @@ func (m *MockClientManager) EXPECT() *MockClientManagerMockRecorder { } // ClientFromRequest mocks base method -func (m *MockClientManager) ClientFromRequest(arg0 config.Dash, arg1 http.ResponseWriter, arg2 *http.Request) (*api.WebsocketClient, error) { +func (m *MockClientManager) ClientFromRequest(arg0 config.Dash, arg1 http.ResponseWriter, arg2 *http.Request) (api.StreamingClient, error) { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "ClientFromRequest", arg0, arg1, arg2) - ret0, _ := ret[0].(*api.WebsocketClient) + ret0, _ := ret[0].(api.StreamingClient) ret1, _ := ret[1].(error) return ret0, ret1 } @@ -55,10 +55,10 @@ func (mr *MockClientManagerMockRecorder) ClientFromRequest(arg0, arg1, arg2 inte } // Clients mocks base method -func (m *MockClientManager) Clients() []*api.WebsocketClient { +func (m *MockClientManager) Clients() []api.StreamingClient { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "Clients") - ret0, _ := ret[0].([]*api.WebsocketClient) + ret0, _ := ret[0].([]api.StreamingClient) return ret0 } @@ -95,10 +95,10 @@ func (mr *MockClientManagerMockRecorder) Run(arg0 interface{}) *gomock.Call { } // TemporaryClientFromLoadingRequest mocks base method -func (m *MockClientManager) TemporaryClientFromLoadingRequest(arg0 http.ResponseWriter, arg1 *http.Request) (*api.WebsocketClient, error) { +func (m *MockClientManager) TemporaryClientFromLoadingRequest(arg0 http.ResponseWriter, arg1 *http.Request) (api.StreamingClient, error) { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "TemporaryClientFromLoadingRequest", arg0, arg1) - ret0, _ := ret[0].(*api.WebsocketClient) + ret0, _ := ret[0].(api.StreamingClient) ret1, _ := ret[1].(error) return ret0, ret1 } diff --git a/internal/api/fake/mock_streaming_client.go b/internal/api/fake/mock_streaming_client.go new file mode 100644 index 0000000000..6b195a523d --- /dev/null +++ b/internal/api/fake/mock_streaming_client.go @@ -0,0 +1,121 @@ +// Code generated by MockGen. DO NOT EDIT. +// Source: github.com/vmware-tanzu/octant/internal/api (interfaces: StreamingClient) + +// Package fake is a generated GoMock package. +package fake + +import ( + reflect "reflect" + + gomock "github.com/golang/mock/gomock" + + api "github.com/vmware-tanzu/octant/internal/api" + octant "github.com/vmware-tanzu/octant/internal/octant" + event "github.com/vmware-tanzu/octant/pkg/event" +) + +// MockStreamingClient is a mock of StreamingClient interface +type MockStreamingClient struct { + ctrl *gomock.Controller + recorder *MockStreamingClientMockRecorder +} + +// MockStreamingClientMockRecorder is the mock recorder for MockStreamingClient +type MockStreamingClientMockRecorder struct { + mock *MockStreamingClient +} + +// NewMockStreamingClient creates a new mock instance +func NewMockStreamingClient(ctrl *gomock.Controller) *MockStreamingClient { + mock := &MockStreamingClient{ctrl: ctrl} + mock.recorder = &MockStreamingClientMockRecorder{mock} + return mock +} + +// EXPECT returns an object that allows the caller to indicate expected use +func (m *MockStreamingClient) EXPECT() *MockStreamingClientMockRecorder { + return m.recorder +} + +// Handlers mocks base method +func (m *MockStreamingClient) Handlers() map[string][]octant.ClientRequestHandler { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Handlers") + ret0, _ := ret[0].(map[string][]octant.ClientRequestHandler) + return ret0 +} + +// Handlers indicates an expected call of Handlers +func (mr *MockStreamingClientMockRecorder) Handlers() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Handlers", reflect.TypeOf((*MockStreamingClient)(nil).Handlers)) +} + +// ID mocks base method +func (m *MockStreamingClient) ID() string { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "ID") + ret0, _ := ret[0].(string) + return ret0 +} + +// ID indicates an expected call of ID +func (mr *MockStreamingClientMockRecorder) ID() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ID", reflect.TypeOf((*MockStreamingClient)(nil).ID)) +} + +// Receive mocks base method +func (m *MockStreamingClient) Receive() (api.StreamRequest, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Receive") + ret0, _ := ret[0].(api.StreamRequest) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// Receive indicates an expected call of Receive +func (mr *MockStreamingClientMockRecorder) Receive() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Receive", reflect.TypeOf((*MockStreamingClient)(nil).Receive)) +} + +// Send mocks base method +func (m *MockStreamingClient) Send(arg0 event.Event) { + m.ctrl.T.Helper() + m.ctrl.Call(m, "Send", arg0) +} + +// Send indicates an expected call of Send +func (mr *MockStreamingClientMockRecorder) Send(arg0 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Send", reflect.TypeOf((*MockStreamingClient)(nil).Send), arg0) +} + +// State mocks base method +func (m *MockStreamingClient) State() octant.State { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "State") + ret0, _ := ret[0].(octant.State) + return ret0 +} + +// State indicates an expected call of State +func (mr *MockStreamingClientMockRecorder) State() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "State", reflect.TypeOf((*MockStreamingClient)(nil).State)) +} + +// StopCh mocks base method +func (m *MockStreamingClient) StopCh() <-chan struct{} { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "StopCh") + ret0, _ := ret[0].(<-chan struct{}) + return ret0 +} + +// StopCh indicates an expected call of StopCh +func (mr *MockStreamingClientMockRecorder) StopCh() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "StopCh", reflect.TypeOf((*MockStreamingClient)(nil).StopCh)) +} diff --git a/internal/api/streaming_connection_manager.go b/internal/api/streaming_connection_manager.go new file mode 100644 index 0000000000..a50bd85bcd --- /dev/null +++ b/internal/api/streaming_connection_manager.go @@ -0,0 +1,171 @@ +/* + * Copyright (c) 2019 the Octant contributors. All Rights Reserved. + * SPDX-License-Identifier: Apache-2.0 + */ + +package api + +import ( + "context" + "net/http" + + "github.com/vmware-tanzu/octant/pkg/event" + + "github.com/google/uuid" + + "github.com/vmware-tanzu/octant/internal/config" + "github.com/vmware-tanzu/octant/internal/octant" +) + +//go:generate mockgen -destination=./fake/mock_client_manager.go -package=fake github.com/vmware-tanzu/octant/internal/api ClientManager +//go:generate mockgen -destination=./fake/mock_client_factory.go -package=fake github.com/vmware-tanzu/octant/internal/api StreamingClientFactory +//go:generate mockgen -destination=./fake/mock_streaming_client.go -package=fake github.com/vmware-tanzu/octant/internal/api StreamingClient + +// ClientManager is an interface for managing clients. +type ClientManager interface { + Run(ctx context.Context) + Clients() []StreamingClient + ClientFromRequest(dashConfig config.Dash, w http.ResponseWriter, r *http.Request) (StreamingClient, error) + TemporaryClientFromLoadingRequest(w http.ResponseWriter, r *http.Request) (StreamingClient, error) + Get(id string) event.WSEventSender +} + +type clientMeta struct { + cancelFunc context.CancelFunc + client StreamingClient +} + +type StreamingClientFactory interface { + NewConnection(uuid.UUID, http.ResponseWriter, *http.Request, *StreamingConnectionManager, config.Dash) (StreamingClient, context.CancelFunc, error) + NewTemporaryConnection(uuid.UUID, http.ResponseWriter, *http.Request, *StreamingConnectionManager) (StreamingClient, context.CancelFunc, error) +} + +// StreamingClient is the interface responsible for sending and receiving +// streaming data to a users session, usually in a browser. +type StreamingClient interface { + OctantClient + + Receive() (StreamRequest, error) + + Handlers() map[string][]octant.ClientRequestHandler + State() octant.State +} + +// StreamingConnectionManager is a client manager for streams. +type StreamingConnectionManager struct { + clientFactory StreamingClientFactory + + // clients is the currently registered clients. + clients map[StreamingClient]context.CancelFunc + + // Register registers requests from clients. + register chan *clientMeta + + // unregister unregisters request from clients. + unregister chan StreamingClient + + // list populates a client list + requestList chan bool + recvList chan []StreamingClient + + ctx context.Context + actionDispatcher ActionDispatcher +} + +var _ ClientManager = (*StreamingConnectionManager)(nil) + +// NewStreamingConnectionManager creates an instance of WebsocketClientManager. +func NewStreamingConnectionManager(ctx context.Context, dispatcher ActionDispatcher, clientFactory StreamingClientFactory) *StreamingConnectionManager { + return &StreamingConnectionManager{ + ctx: ctx, + clients: make(map[StreamingClient]context.CancelFunc), + register: make(chan *clientMeta), + unregister: make(chan StreamingClient), + requestList: make(chan bool), + recvList: make(chan []StreamingClient), + actionDispatcher: dispatcher, + clientFactory: clientFactory, + } +} + +func (m *StreamingConnectionManager) Clients() []StreamingClient { + m.requestList <- true + clients := <-m.recvList + return clients +} + +// Run runs the manager. It manages multiple websocket clients. +func (m *StreamingConnectionManager) Run(ctx context.Context) { + done := false + for !done { + select { + case <-ctx.Done(): + done = true + case meta := <-m.register: + m.clients[meta.client] = meta.cancelFunc + case client := <-m.unregister: + if cancelFunc, ok := m.clients[client]; ok { + cancelFunc() + delete(m.clients, client) + } + case <-m.requestList: + clients := []StreamingClient{} + for client := range m.clients { + clients = append(clients, client) + } + m.recvList <- clients + } + } +} + +// ClientFromRequest creates a websocket client from a http request. +func (m *StreamingConnectionManager) ClientFromRequest(dashConfig config.Dash, w http.ResponseWriter, r *http.Request) (StreamingClient, error) { + clientID, err := uuid.NewUUID() + if err != nil { + return nil, err + } + + client, cancel, err := m.clientFactory.NewConnection(clientID, w, r, m, dashConfig) + if err != nil { + return nil, err + } + m.register <- &clientMeta{ + cancelFunc: func() { + cancel() + m.unregister <- client + }, + client: client, + } + + return client, nil +} + +func (m *StreamingConnectionManager) TemporaryClientFromLoadingRequest(w http.ResponseWriter, r *http.Request) (StreamingClient, error) { + clientID, err := uuid.NewUUID() + if err != nil { + return nil, err + } + + client, cancel, err := m.clientFactory.NewTemporaryConnection(clientID, w, r, m) + if err != nil { + return nil, err + } + m.register <- &clientMeta{ + cancelFunc: func() { + cancel() + m.unregister <- client + }, + client: client, + } + + return client, nil +} + +func (m *StreamingConnectionManager) Get(id string) event.WSEventSender { + for _, client := range m.Clients() { + if id == client.ID() { + return client + } + } + return nil +} diff --git a/internal/api/streaming_service.go b/internal/api/streaming_service.go new file mode 100644 index 0000000000..49c085088b --- /dev/null +++ b/internal/api/streaming_service.go @@ -0,0 +1,44 @@ +/* + * Copyright (c) 2019 the Octant contributors. All Rights Reserved. + * SPDX-License-Identifier: Apache-2.0 + */ + +package api + +import ( + "fmt" + "net/http" + + "github.com/vmware-tanzu/octant/internal/config" +) + +func streamService(manager ClientManager, dashConfig config.Dash) http.HandlerFunc { + return func(w http.ResponseWriter, r *http.Request) { + serveStreamingApi(manager, dashConfig, w, r) + } +} + +func serveStreamingApi(manager ClientManager, dashConfig config.Dash, w http.ResponseWriter, r *http.Request) { + _, err := manager.ClientFromRequest(dashConfig, w, r) + if err != nil { + if dashConfig != nil { + logger := dashConfig.Logger() + logger.WithErr(err).Errorf("create websocket client") + } + return + } +} + +// Create dummy websocketService and serveWebsocket +func loadingStreamService(manager ClientManager) http.HandlerFunc { + return func(w http.ResponseWriter, r *http.Request) { + serveLoadingStreamingApi(manager, w, r) + } +} + +func serveLoadingStreamingApi(manager ClientManager, w http.ResponseWriter, r *http.Request) { + _, err := manager.TemporaryClientFromLoadingRequest(w, r) + if err != nil { + fmt.Println("create loading websocket client") + } +} diff --git a/internal/api/websocket_client.go b/internal/api/websocket_client.go index c288b3976a..d3b8f84e7d 100644 --- a/internal/api/websocket_client.go +++ b/internal/api/websocket_client.go @@ -41,6 +41,11 @@ const ( maxMessageSize = 2 * 1024 * 1024 // 2MiB ) +type StreamRequest struct { + Type string `json:"type"` + Payload action.Payload `json:"payload"` +} + // WebsocketClient manages websocket clients. type WebsocketClient struct { conn *websocket.Conn @@ -49,7 +54,7 @@ type WebsocketClient struct { logger log.Logger ctx context.Context cancel context.CancelFunc - manager *WebsocketClientManager + manager *StreamingConnectionManager isOpen atomic.Value state octant.State @@ -59,9 +64,10 @@ type WebsocketClient struct { } var _ OctantClient = (*WebsocketClient)(nil) +var _ StreamingClient = (*WebsocketClient)(nil) // NewWebsocketClient creates an instance of WebsocketClient. -func NewWebsocketClient(ctx context.Context, conn *websocket.Conn, manager *WebsocketClientManager, dashConfig config.Dash, actionDispatcher ActionDispatcher, id uuid.UUID) *WebsocketClient { +func NewWebsocketClient(ctx context.Context, conn *websocket.Conn, manager *StreamingConnectionManager, dashConfig config.Dash, actionDispatcher ActionDispatcher, id uuid.UUID) *WebsocketClient { logger := dashConfig.Logger().With("component", "websocket-client", "client-id", id.String()) ctx = internalLog.WithLoggerContext(ctx, logger) @@ -98,7 +104,7 @@ func NewWebsocketClient(ctx context.Context, conn *websocket.Conn, manager *Webs } // NewTemporaryWebsocketClient creates an instance of WebsocketClient -func NewTemporaryWebsocketClient(ctx context.Context, conn *websocket.Conn, manager *WebsocketClientManager, actionDispatcher ActionDispatcher, id uuid.UUID) *WebsocketClient { +func NewTemporaryWebsocketClient(ctx context.Context, conn *websocket.Conn, manager *StreamingConnectionManager, actionDispatcher ActionDispatcher, id uuid.UUID) *WebsocketClient { ctx, cancel := context.WithCancel(ctx) logger := internalLog.From(ctx) @@ -130,6 +136,10 @@ func (c *WebsocketClient) ID() string { return c.id.String() } +func (c *WebsocketClient) Handlers() map[string][]octant.ClientRequestHandler { + return c.handlers +} + func (c *WebsocketClient) readPump() { defer func() { c.isOpen.Store(false) @@ -156,16 +166,13 @@ func (c *WebsocketClient) readPump() { }) for { - _, message, err := c.conn.ReadMessage() + request, err := c.Receive() if err != nil { - if websocket.IsUnexpectedCloseError(err, websocket.CloseGoingAway, websocket.CloseAbnormalClosure, websocket.CloseNoStatusReceived) { - c.logger.WithErr(err).Errorf("Unhandled websocket error") - } - c.cancel() + c.logger.WithErr(err).Errorf("Unhandled websocket error") break } - if err := c.handle(message); err != nil { + if err := HandleStreamingMessage(c, request); err != nil { c.logger.WithErr(err).Errorf("Handle websocket message") } } @@ -173,27 +180,22 @@ func (c *WebsocketClient) readPump() { close(c.stopCh) } -func (c *WebsocketClient) handle(message []byte) error { - var request websocketRequest - if err := json.Unmarshal(message, &request); err != nil { - return err - } - - handlers, ok := c.handlers[request.Type] +func HandleStreamingMessage(client StreamingClient, request StreamRequest) error { + handlers, ok := client.Handlers()[request.Type] if !ok { - return c.handleUnknownRequest(request) + return handleUnknownRequest(client, request) } var g errgroup.Group for _, handler := range handlers { g.Go(func() error { - return handler.Handler(c.state, request.Payload) + return handler.Handler(client.State(), request.Payload) }) } if err := g.Wait(); err != nil { - c.Send(event.CreateEvent("handlerError", action.Payload{ + client.Send(event.CreateEvent("handlerError", action.Payload{ "requestType": request.Type, "error": err.Error(), })) @@ -203,7 +205,7 @@ func (c *WebsocketClient) handle(message []byte) error { return nil } -func (c *WebsocketClient) handleUnknownRequest(request websocketRequest) error { +func handleUnknownRequest(client OctantClient, request StreamRequest) error { message := "unknown request" if request.Type != "" { message = fmt.Sprintf("unknown request %s", request.Type) @@ -213,7 +215,7 @@ func (c *WebsocketClient) handleUnknownRequest(request websocketRequest) error { "message": message, "payload": request.Payload, } - c.Send(event.CreateEvent(event.EventTypeUnknown, m)) + client.Send(event.CreateEvent(event.EventTypeUnknown, m)) return nil } @@ -279,6 +281,28 @@ func (c *WebsocketClient) Send(ev event.Event) { } } +func (c *WebsocketClient) Receive() (StreamRequest, error) { + _, message, err := c.conn.ReadMessage() + if err != nil { + if websocket.IsUnexpectedCloseError( + err, websocket.CloseGoingAway, websocket.CloseAbnormalClosure, websocket.CloseNoStatusReceived, + ) { + } + c.cancel() + } + + var request StreamRequest + if err := json.Unmarshal(message, &request); err != nil { + c.logger.WithErr(err).Errorf("Handle websocket message") + } + + return request, nil +} + +func (c *WebsocketClient) State() octant.State { + return c.state +} + // StopCh returns the client's stop channel. It will be closed when the WebsocketClient is closed. func (c *WebsocketClient) StopCh() <-chan struct{} { return c.stopCh @@ -287,8 +311,3 @@ func (c *WebsocketClient) StopCh() <-chan struct{} { func (c *WebsocketClient) RegisterHandler(handler octant.ClientRequestHandler) { c.handlers[handler.RequestType] = append(c.handlers[handler.RequestType], handler) } - -type websocketRequest struct { - Type string `json:"type"` - Payload action.Payload `json:"payload"` -} diff --git a/internal/api/websocket_client_manager.go b/internal/api/websocket_client_manager.go deleted file mode 100644 index 79ca78a595..0000000000 --- a/internal/api/websocket_client_manager.go +++ /dev/null @@ -1,155 +0,0 @@ -/* - * Copyright (c) 2019 the Octant contributors. All Rights Reserved. - * SPDX-License-Identifier: Apache-2.0 - */ - -package api - -import ( - "context" - "net/http" - - "github.com/vmware-tanzu/octant/pkg/event" - - "github.com/google/uuid" - - "github.com/vmware-tanzu/octant/internal/config" -) - -//go:generate mockgen -destination=./fake/mock_client_manager.go -package=fake github.com/vmware-tanzu/octant/internal/api ClientManager - -// ClientManager is an interface for managing clients. -type ClientManager interface { - Run(ctx context.Context) - Clients() []*WebsocketClient - ClientFromRequest(dashConfig config.Dash, w http.ResponseWriter, r *http.Request) (*WebsocketClient, error) - TemporaryClientFromLoadingRequest(w http.ResponseWriter, r *http.Request) (*WebsocketClient, error) - Get(id string) event.WSEventSender -} - -type clientMeta struct { - cancelFunc context.CancelFunc - client *WebsocketClient -} - -// WebsocketClientManager is a client manager for websockets. -type WebsocketClientManager struct { - // clients is the currently registered clients. - clients map[*WebsocketClient]context.CancelFunc - - // Register registers requests from clients. - register chan *clientMeta - - // unregister unregisters request from clients. - unregister chan *WebsocketClient - - // list populates a client list - requestList chan bool - recvList chan []*WebsocketClient - - ctx context.Context - actionDispatcher ActionDispatcher -} - -var _ ClientManager = (*WebsocketClientManager)(nil) - -// NewWebsocketClientManager creates an instance of WebsocketClientManager. -func NewWebsocketClientManager(ctx context.Context, dispatcher ActionDispatcher) *WebsocketClientManager { - return &WebsocketClientManager{ - ctx: ctx, - clients: make(map[*WebsocketClient]context.CancelFunc), - register: make(chan *clientMeta), - unregister: make(chan *WebsocketClient), - requestList: make(chan bool), - recvList: make(chan []*WebsocketClient), - actionDispatcher: dispatcher, - } -} - -func (m *WebsocketClientManager) Clients() []*WebsocketClient { - m.requestList <- true - clients := <-m.recvList - return clients -} - -// Run runs the manager. It manages multiple websocket clients. -func (m *WebsocketClientManager) Run(ctx context.Context) { - done := false - for !done { - select { - case <-ctx.Done(): - done = true - case meta := <-m.register: - m.clients[meta.client] = meta.cancelFunc - case client := <-m.unregister: - if cancelFunc, ok := m.clients[client]; ok { - cancelFunc() - delete(m.clients, client) - } - case <-m.requestList: - clients := []*WebsocketClient{} - for client := range m.clients { - clients = append(clients, client) - } - m.recvList <- clients - } - } -} - -// ClientFromRequest creates a websocket client from a http request. -func (m *WebsocketClientManager) ClientFromRequest(dashConfig config.Dash, w http.ResponseWriter, r *http.Request) (*WebsocketClient, error) { - clientID, err := uuid.NewUUID() - if err != nil { - return nil, err - } - - conn, err := upgrader.Upgrade(w, r, nil) - if err != nil { - return nil, err - } - - ctx, cancel := context.WithCancel(m.ctx) - client := NewWebsocketClient(ctx, conn, m, dashConfig, m.actionDispatcher, clientID) - m.register <- &clientMeta{ - cancelFunc: func() { - cancel() - m.unregister <- client - }, - client: client, - } - - return client, nil -} - -func (m *WebsocketClientManager) TemporaryClientFromLoadingRequest(w http.ResponseWriter, r *http.Request) (*WebsocketClient, error) { - clientID, err := uuid.NewUUID() - if err != nil { - return nil, err - } - - conn, err := upgrader.Upgrade(w, r, nil) - if err != nil { - return nil, err - } - - ctx, cancel := context.WithCancel(m.ctx) - client := NewTemporaryWebsocketClient(ctx, conn, m, m.actionDispatcher, clientID) - m.register <- &clientMeta{ - cancelFunc: func() { - cancel() - m.unregister <- client - }, - client: client, - } - - return client, nil -} - -func (m *WebsocketClientManager) Get(id string) event.WSEventSender { - for _, client := range m.Clients() { - if id == client.ID() { - return client - } - } - return nil -} diff --git a/internal/api/websocket_connection_factory.go b/internal/api/websocket_connection_factory.go new file mode 100644 index 0000000000..18ddacc48b --- /dev/null +++ b/internal/api/websocket_connection_factory.go @@ -0,0 +1,69 @@ +package api + +import ( + "context" + "net" + "net/http" + + "github.com/google/uuid" + "github.com/gorilla/websocket" + + "github.com/vmware-tanzu/octant/internal/config" +) + +type WebsocketConnectionFactory struct { + upgrader websocket.Upgrader +} + +func NewWebsocketConnectionFactory() *WebsocketConnectionFactory { + return &WebsocketConnectionFactory{ + upgrader: websocket.Upgrader{ + ReadBufferSize: 1024, + WriteBufferSize: 1024, + CheckOrigin: func(r *http.Request) bool { + host, _, err := net.SplitHostPort(r.RemoteAddr) + if err != nil { + return false + } + + return shouldAllowHost(host, acceptedHosts()) + }, + }, + } +} + +var _ StreamingClientFactory = (*WebsocketConnectionFactory)(nil) + +func (wcf *WebsocketConnectionFactory) NewConnection( + clientID uuid.UUID, w http.ResponseWriter, r *http.Request, m *StreamingConnectionManager, dashConfig config.Dash, +) (StreamingClient, context.CancelFunc, error) { + conn, err := wcf.upgrader.Upgrade(w, r, nil) + if err != nil { + return nil, nil, err + } + + ctx, cancel := context.WithCancel(m.ctx) + client := NewWebsocketClient(ctx, conn, m, dashConfig, m.actionDispatcher, clientID) + + go client.readPump() + go client.writePump() + + return client, cancel, nil +} + +func (wcf *WebsocketConnectionFactory) NewTemporaryConnection( + clientID uuid.UUID, w http.ResponseWriter, r *http.Request, m *StreamingConnectionManager, +) (StreamingClient, context.CancelFunc, error) { + conn, err := wcf.upgrader.Upgrade(w, r, nil) + if err != nil { + return nil, nil, err + } + + ctx, cancel := context.WithCancel(m.ctx) + client := NewTemporaryWebsocketClient(ctx, conn, m, m.actionDispatcher, clientID) + + go client.readPump() + go client.writePump() + + return client, cancel, nil +} diff --git a/internal/api/websocket_service.go b/internal/api/websocket_service.go deleted file mode 100644 index 603745fa27..0000000000 --- a/internal/api/websocket_service.go +++ /dev/null @@ -1,68 +0,0 @@ -/* - * Copyright (c) 2019 the Octant contributors. All Rights Reserved. - * SPDX-License-Identifier: Apache-2.0 - */ - -package api - -import ( - "fmt" - "net" - "net/http" - - "github.com/gorilla/websocket" - - "github.com/vmware-tanzu/octant/internal/config" -) - -var ( - upgrader = websocket.Upgrader{ - ReadBufferSize: 1024, - WriteBufferSize: 1024, - CheckOrigin: func(r *http.Request) bool { - host, _, err := net.SplitHostPort(r.RemoteAddr) - if err != nil { - return false - } - - return shouldAllowHost(host, acceptedHosts()) - }, - } -) - -func websocketService(manager ClientManager, dashConfig config.Dash) http.HandlerFunc { - return func(w http.ResponseWriter, r *http.Request) { - serveWebsocket(manager, dashConfig, w, r) - } -} - -func serveWebsocket(manager ClientManager, dashConfig config.Dash, w http.ResponseWriter, r *http.Request) { - client, err := manager.ClientFromRequest(dashConfig, w, r) - if err != nil { - if dashConfig != nil { - logger := dashConfig.Logger() - logger.WithErr(err).Errorf("create websocket client") - } - return - } - - go client.readPump() - go client.writePump() -} - -// Create dummy websocketService and serveWebsocket -func loadingWebsocketService(manager ClientManager) http.HandlerFunc { - return func(w http.ResponseWriter, r *http.Request) { - serveLoadingWebsocket(manager, w, r) - } -} - -func serveLoadingWebsocket(manager ClientManager, w http.ResponseWriter, r *http.Request) { - client, err := manager.TemporaryClientFromLoadingRequest(w, r) - if err != nil { - fmt.Println("create loading websocket client") - } - - go client.readPump() - go client.writePump() -} diff --git a/internal/api/websocket_service_test.go b/internal/api/websocket_service_test.go index b72c78982b..36546466e7 100644 --- a/internal/api/websocket_service_test.go +++ b/internal/api/websocket_service_test.go @@ -9,14 +9,14 @@ import ( ) type fakeWebsocketClientManager struct { - WebsocketClientManager + StreamingConnectionManager } -func (c *fakeWebsocketClientManager) ClientFromRequest(dashConfig config.Dash, w http.ResponseWriter, r *http.Request) (*WebsocketClient, error) { +func (c *fakeWebsocketClientManager) ClientFromRequest(dashConfig config.Dash, w http.ResponseWriter, r *http.Request) (StreamingClient, error) { return nil, fmt.Errorf("test: error") } func TestWebsocketService_serveWebsocket(t *testing.T) { f := &fakeWebsocketClientManager{} - serveWebsocket(f, nil, nil, nil) + serveStreamingApi(f, nil, nil, nil) } diff --git a/internal/api/websocket_state.go b/internal/api/websocket_state.go index f0c91ea3f7..fad736c576 100644 --- a/internal/api/websocket_state.go +++ b/internal/api/websocket_state.go @@ -54,9 +54,10 @@ func defaultStateManagers(clientID string, dashConfig config.Dash) []StateManage } } -// OctantClient is an OctantClient. +// OctantClient is the interface responsible for sending streaming data to a +// users session, usually in a browser. type OctantClient interface { - Send(event event.Event) + Send(event.Event) ID() string StopCh() <-chan struct{} } diff --git a/pkg/dash/dash.go b/pkg/dash/dash.go index fd898d49fc..38fa046831 100644 --- a/pkg/dash/dash.go +++ b/pkg/dash/dash.go @@ -71,6 +71,7 @@ type Options struct { Listener net.Listener clusterClient cluster.ClientInterface factory dynamicinformer.DynamicSharedInformerFactory + streamingClientFactory api.StreamingClientFactory } type RunnerOption struct { @@ -215,15 +216,23 @@ func WithClusterClient(client cluster.ClientInterface) RunnerOption { } } +func WithStreamingClientFactory(factory api.StreamingClientFactory) RunnerOption { + return RunnerOption{ + nonClusterOption: func(o *Options) { + o.streamingClientFactory = factory + }, + } +} + type Runner struct { - ctx context.Context - dash *dash - pluginManager *plugin.Manager - moduleManager *module.Manager - actionManager *action.Manager - websocketClientManager *api.WebsocketClientManager - apiCreated bool - fs afero.Fs + ctx context.Context + dash *dash + pluginManager *plugin.Manager + moduleManager *module.Manager + actionManager *action.Manager + streamingConnectionManager *api.StreamingConnectionManager + apiCreated bool + fs afero.Fs } func NewRunner(ctx context.Context, logger log.Logger, opts ...RunnerOption) (*Runner, error) { @@ -244,9 +253,14 @@ func NewRunner(ctx context.Context, logger log.Logger, opts ...RunnerOption) (*R actionManger := action.NewManager(logger) r.actionManager = actionManger - websocketClientManager := api.NewWebsocketClientManager(ctx, r.actionManager) - r.websocketClientManager = websocketClientManager - go websocketClientManager.Run(ctx) + var streamingConnectionManager *api.StreamingConnectionManager + if options.streamingClientFactory != nil { + streamingConnectionManager = api.NewStreamingConnectionManager(ctx, r.actionManager, options.streamingClientFactory) + } else { + streamingConnectionManager = api.NewStreamingConnectionManager(ctx, r.actionManager, api.NewWebsocketConnectionFactory()) + } + r.streamingConnectionManager = streamingConnectionManager + go streamingConnectionManager.Run(ctx) var err error @@ -287,7 +301,7 @@ func (r *Runner) apiFromKubeConfig(kubeConfig string, opts ...RunnerOption) (api return r.initAPI(r.ctx, logger, opts...) } else { logger.Infof("no valid kube config found, initializing loading API") - return api.NewLoadingAPI(r.ctx, api.PathPrefix, r.actionManager, r.websocketClientManager, logger), nil, nil + return api.NewLoadingAPI(r.ctx, api.PathPrefix, r.actionManager, r.streamingConnectionManager, logger), nil, nil } } @@ -439,10 +453,10 @@ func (r *Runner) initAPI(ctx context.Context, logger log.Logger, opts ...RunnerO PortForwarder: portForwarder, NamespaceInterface: nsClient, FrontendProxy: frontendProxy, - WebsocketClientManager: r.websocketClientManager, + WebsocketClientManager: r.streamingConnectionManager, } - pluginManager, err := initPlugin(moduleManager, r.actionManager, r.websocketClientManager, pluginDashboardService) + pluginManager, err := initPlugin(moduleManager, r.actionManager, r.streamingConnectionManager, pluginDashboardService) if err != nil { return nil, nil, fmt.Errorf("initializing plugin manager: %w", err) } @@ -501,7 +515,7 @@ func (r *Runner) initAPI(ctx context.Context, logger log.Logger, opts ...RunnerO return nil, nil, fmt.Errorf("unable to start CRD watcher: %w", err) } - apiService := api.New(ctx, api.PathPrefix, r.actionManager, r.websocketClientManager, dashConfig) + apiService := api.New(ctx, api.PathPrefix, r.actionManager, r.streamingConnectionManager, dashConfig) frontendProxy.FrontendUpdateController = apiService r.apiCreated = true diff --git a/vendor/modules.txt b/vendor/modules.txt index de69d61e28..9dcda58df8 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -385,8 +385,7 @@ google.golang.org/grpc/serviceconfig google.golang.org/grpc/stats google.golang.org/grpc/status google.golang.org/grpc/tap -# google.golang.org/protobuf v1.26.0 -## explicit +google.golang.org/protobuf v1.26.0 google.golang.org/protobuf/cmd/protoc-gen-go/internal_gengo google.golang.org/protobuf/compiler/protogen google.golang.org/protobuf/encoding/prototext From fdb5e6c09cb404ed8c0d93b6bd0a495ccdd7afa0 Mon Sep 17 00:00:00 2001 From: Michael Stergianis Date: Thu, 4 Mar 2021 09:53:21 -0500 Subject: [PATCH 2/9] Fix error when closing browser tab There was some incorrect error handling in my last commit. When fixing that I noticed that I slightly changed the semantics of the websocket read pump when I moved unmarshaling to be a client.Recieve responsibility. So I introduced a new concept that a stream client can return a fatal error, and read pump can react to that appropriately. This paves the way for read pump to become a function owned by the streaming client manager instead of a method on websocket client. Signed-off-by: Michael Stergianis --- internal/api/streaming_connection_errors.go | 22 +++++++++++++++++++++ internal/api/websocket_client.go | 14 +++++++++---- 2 files changed, 32 insertions(+), 4 deletions(-) create mode 100644 internal/api/streaming_connection_errors.go diff --git a/internal/api/streaming_connection_errors.go b/internal/api/streaming_connection_errors.go new file mode 100644 index 0000000000..6ad997d4bc --- /dev/null +++ b/internal/api/streaming_connection_errors.go @@ -0,0 +1,22 @@ +package api + +type StreamError struct { + error + Fatal bool +} + +func FatalStreamError(err error) error { + return StreamError{ + err, + true, + } +} + +func IsFatalStreamError(err error) bool { + sErr, ok := err.(StreamError) + if !ok { + return false + } + + return sErr.Fatal +} diff --git a/internal/api/websocket_client.go b/internal/api/websocket_client.go index d3b8f84e7d..488f4794c1 100644 --- a/internal/api/websocket_client.go +++ b/internal/api/websocket_client.go @@ -168,8 +168,12 @@ func (c *WebsocketClient) readPump() { for { request, err := c.Receive() if err != nil { - c.logger.WithErr(err).Errorf("Unhandled websocket error") - break + if IsFatalStreamError(err) { + c.cancel() + break + } + + continue } if err := HandleStreamingMessage(c, request); err != nil { @@ -287,13 +291,15 @@ func (c *WebsocketClient) Receive() (StreamRequest, error) { if websocket.IsUnexpectedCloseError( err, websocket.CloseGoingAway, websocket.CloseAbnormalClosure, websocket.CloseNoStatusReceived, ) { + c.logger.WithErr(err).Errorf("Unhandled websocket error") } - c.cancel() + return StreamRequest{}, FatalStreamError(err) } var request StreamRequest if err := json.Unmarshal(message, &request); err != nil { - c.logger.WithErr(err).Errorf("Handle websocket message") + c.logger.WithErr(err).Errorf("Unmarshaling websocket message") + return StreamRequest{}, err } return request, nil From 8c742c1e951aac7f6bf4308476c6dbf75681a10c Mon Sep 17 00:00:00 2001 From: Michael Stergianis Date: Wed, 17 Mar 2021 18:01:40 -0400 Subject: [PATCH 3/9] Make handleStreamingMessage unexported - change for loop semantics in (*StreamingConnectionManager).Run Signed-off-by: Michael Stergianis --- internal/api/streaming_connection_manager.go | 5 ++--- internal/api/websocket_client.go | 4 ++-- 2 files changed, 4 insertions(+), 5 deletions(-) diff --git a/internal/api/streaming_connection_manager.go b/internal/api/streaming_connection_manager.go index a50bd85bcd..8b03151762 100644 --- a/internal/api/streaming_connection_manager.go +++ b/internal/api/streaming_connection_manager.go @@ -96,11 +96,10 @@ func (m *StreamingConnectionManager) Clients() []StreamingClient { // Run runs the manager. It manages multiple websocket clients. func (m *StreamingConnectionManager) Run(ctx context.Context) { - done := false - for !done { + for { select { case <-ctx.Done(): - done = true + return case meta := <-m.register: m.clients[meta.client] = meta.cancelFunc case client := <-m.unregister: diff --git a/internal/api/websocket_client.go b/internal/api/websocket_client.go index 488f4794c1..9f2c0c6564 100644 --- a/internal/api/websocket_client.go +++ b/internal/api/websocket_client.go @@ -176,7 +176,7 @@ func (c *WebsocketClient) readPump() { continue } - if err := HandleStreamingMessage(c, request); err != nil { + if err := handleStreamingMessage(c, request); err != nil { c.logger.WithErr(err).Errorf("Handle websocket message") } } @@ -184,7 +184,7 @@ func (c *WebsocketClient) readPump() { close(c.stopCh) } -func HandleStreamingMessage(client StreamingClient, request StreamRequest) error { +func handleStreamingMessage(client StreamingClient, request StreamRequest) error { handlers, ok := client.Handlers()[request.Type] if !ok { return handleUnknownRequest(client, request) From cfb4958d7943ab617962dbbeeb258fb80790151f Mon Sep 17 00:00:00 2001 From: Wayne Witzel III Date: Fri, 28 May 2021 12:52:52 -0400 Subject: [PATCH 4/9] explict module dep Signed-off-by: Wayne Witzel III --- vendor/modules.txt | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/vendor/modules.txt b/vendor/modules.txt index 9dcda58df8..de69d61e28 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -385,7 +385,8 @@ google.golang.org/grpc/serviceconfig google.golang.org/grpc/stats google.golang.org/grpc/status google.golang.org/grpc/tap -google.golang.org/protobuf v1.26.0 +# google.golang.org/protobuf v1.26.0 +## explicit google.golang.org/protobuf/cmd/protoc-gen-go/internal_gengo google.golang.org/protobuf/compiler/protogen google.golang.org/protobuf/encoding/prototext From 89084044d818557c3922d29afd5bcef77e9355a7 Mon Sep 17 00:00:00 2001 From: Michael Stergianis Date: Wed, 9 Jun 2021 10:51:33 -0400 Subject: [PATCH 5/9] Move StreamError to pkg/errors Allows library implementations of StreamClient and StreamClientFactory to return fatal StreamErrors Signed-off-by: Michael Stergianis --- internal/api/streaming_connection_errors.go | 22 ------- internal/api/websocket_client.go | 5 +- pkg/errors/streaming_connection_error.go | 39 +++++++++++ pkg/errors/streaming_connection_error_test.go | 65 +++++++++++++++++++ 4 files changed, 107 insertions(+), 24 deletions(-) delete mode 100644 internal/api/streaming_connection_errors.go create mode 100644 pkg/errors/streaming_connection_error.go create mode 100644 pkg/errors/streaming_connection_error_test.go diff --git a/internal/api/streaming_connection_errors.go b/internal/api/streaming_connection_errors.go deleted file mode 100644 index 6ad997d4bc..0000000000 --- a/internal/api/streaming_connection_errors.go +++ /dev/null @@ -1,22 +0,0 @@ -package api - -type StreamError struct { - error - Fatal bool -} - -func FatalStreamError(err error) error { - return StreamError{ - err, - true, - } -} - -func IsFatalStreamError(err error) bool { - sErr, ok := err.(StreamError) - if !ok { - return false - } - - return sErr.Fatal -} diff --git a/internal/api/websocket_client.go b/internal/api/websocket_client.go index 9f2c0c6564..1f47461e00 100644 --- a/internal/api/websocket_client.go +++ b/internal/api/websocket_client.go @@ -12,6 +12,7 @@ import ( "time" "github.com/vmware-tanzu/octant/internal/util/json" + "github.com/vmware-tanzu/octant/pkg/errors" "github.com/vmware-tanzu/octant/pkg/event" @@ -168,7 +169,7 @@ func (c *WebsocketClient) readPump() { for { request, err := c.Receive() if err != nil { - if IsFatalStreamError(err) { + if errors.IsFatalStreamError(err) { c.cancel() break } @@ -293,7 +294,7 @@ func (c *WebsocketClient) Receive() (StreamRequest, error) { ) { c.logger.WithErr(err).Errorf("Unhandled websocket error") } - return StreamRequest{}, FatalStreamError(err) + return StreamRequest{}, errors.FatalStreamError(err) } var request StreamRequest diff --git a/pkg/errors/streaming_connection_error.go b/pkg/errors/streaming_connection_error.go new file mode 100644 index 0000000000..0a87c699fc --- /dev/null +++ b/pkg/errors/streaming_connection_error.go @@ -0,0 +1,39 @@ +package errors + +import "github.com/vmware-tanzu/octant/internal/errors" + +type StreamError struct { + *errors.GenericError + Fatal bool +} + +func NewStreamError(err error) *StreamError { + return &StreamError{ + errors.NewGenericError(err), + false, + } +} + +func FatalStreamError(err error) *StreamError { + return &StreamError{ + errors.NewGenericError(err), + true, + } +} + +func IsFatalStreamError(err error) bool { + switch sErr := err.(type) { + case StreamError: + return sErr.Fatal + case *StreamError: + return sErr.Fatal + default: + return false + } +} + +const StreamingConnectionError = "StreamingConnectionError" + +func (s *StreamError) Name() string { + return StreamingConnectionError +} diff --git a/pkg/errors/streaming_connection_error_test.go b/pkg/errors/streaming_connection_error_test.go new file mode 100644 index 0000000000..24c8293125 --- /dev/null +++ b/pkg/errors/streaming_connection_error_test.go @@ -0,0 +1,65 @@ +package errors + +import ( + "fmt" + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestConstructors(t *testing.T) { + err := fmt.Errorf("encountered an error while streaming") + table := []struct { + name string + sErr *StreamError + fatal bool + }{ + {"NewStreamError", NewStreamError(err), false}, + {"FatalStreamError", FatalStreamError(err), true}, + } + + for _, test := range table { + t.Run(test.name, func(t *testing.T) { + assert.NotEmpty(t, test.sErr.Timestamp()) + assert.Equal(t, test.sErr.Name(), StreamingConnectionError) + assert.NotZero(t, test.sErr.ID()) + assert.Equal(t, test.fatal, test.sErr.Fatal) + assert.Equal(t, err.Error(), test.sErr.Error()) + }) + } +} + +func TestIsFatalStreamError(t *testing.T) { + table := []struct { + name string + sErr error + expected bool + }{ + { + "Expect IsFatalStreamError to be false for a standard stream error", + NewStreamError(fmt.Errorf("")), + false, + }, + { + "Expect IsFatalStreamError to be true for fatal stream error", + FatalStreamError(fmt.Errorf("")), + true, + }, + { + "A non pointer fatal stream error is provided", + *FatalStreamError(fmt.Errorf("")), + true, + }, + { + "A generic error is provided", + fmt.Errorf(""), + false, + }, + } + + for _, test := range table { + t.Run(test.name, func(t *testing.T) { + assert.Equal(t, test.expected, IsFatalStreamError(test.sErr)) + }) + } +} From 863a7a5272a1256918ada5901fdd3844049a2738 Mon Sep 17 00:00:00 2001 From: Michael Stergianis Date: Wed, 9 Jun 2021 16:33:34 -0400 Subject: [PATCH 6/9] Move streaming interfaces and websocket implementation to pkg Signed-off-by: Michael Stergianis --- internal/api/action_request_manager.go | 3 +- internal/api/api.go | 19 +++--- internal/api/api_test.go | 7 ++- internal/api/container_logs.go | 5 +- internal/api/container_logs_test.go | 3 +- internal/api/content_manager.go | 5 +- internal/api/content_manager_test.go | 16 ++++- internal/api/context_manager.go | 5 +- internal/api/context_manager_test.go | 2 +- internal/api/filter_manager.go | 3 +- internal/api/helper_manager.go | 3 +- internal/api/helper_manager_test.go | 2 +- internal/api/loading_state.go | 9 +-- internal/api/loading_state_test.go | 2 +- internal/api/middleware.go | 2 +- internal/api/middleware_test.go | 4 +- internal/api/namespaces_manager.go | 5 +- internal/api/namespaces_manager_test.go | 2 +- internal/api/navigation_manager.go | 5 +- internal/api/navigation_manager_test.go | 2 +- internal/api/state_manager.go | 10 +++ internal/api/streaming_service.go | 9 +-- internal/api/terminal_manager.go | 5 +- internal/api/terminal_manager_test.go | 2 +- internal/api/websocket_service_test.go | 5 +- {internal => pkg}/api/action.go | 2 +- .../api/fake/mock_action_dispatcher.go | 2 +- .../api/fake/mock_client_factory.go | 8 +-- .../api/fake/mock_client_manager.go | 32 +++++++++- .../api/fake/mock_octant_client.go | 2 +- .../api/fake/mock_state_manager.go | 4 +- .../api/fake/mock_streaming_client.go | 4 +- pkg/api/state_manager.go | 19 ++++++ pkg/api/streaming.go | 62 +++++++++++++++++++ .../api/streaming_connection_manager.go | 52 +++++----------- .../api/websockets}/websocket_client.go | 31 ++++------ .../api/websockets}/websocket_client_test.go | 2 +- .../websocket_connection_factory.go | 24 +++---- .../api/websockets}/websocket_state.go | 59 +++++++----------- .../api/websockets}/websocket_state_test.go | 39 +++++------- pkg/dash/dash.go | 22 ++++--- 41 files changed, 297 insertions(+), 202 deletions(-) create mode 100644 internal/api/state_manager.go rename {internal => pkg}/api/action.go (84%) rename {internal => pkg}/api/fake/mock_action_dispatcher.go (94%) rename {internal => pkg}/api/fake/mock_client_factory.go (85%) rename {internal => pkg}/api/fake/mock_client_manager.go (77%) rename {internal => pkg}/api/fake/mock_octant_client.go (96%) rename {internal => pkg}/api/fake/mock_state_manager.go (93%) rename {internal => pkg}/api/fake/mock_streaming_client.go (96%) create mode 100644 pkg/api/state_manager.go create mode 100644 pkg/api/streaming.go rename {internal => pkg}/api/streaming_connection_manager.go (70%) rename {internal/api => pkg/api/websockets}/websocket_client.go (89%) rename {internal/api => pkg/api/websockets}/websocket_client_test.go (98%) rename {internal/api => pkg/api/websockets}/websocket_connection_factory.go (55%) rename {internal/api => pkg/api/websockets}/websocket_state.go (85%) rename {internal/api => pkg/api/websockets}/websocket_state_test.go (88%) diff --git a/internal/api/action_request_manager.go b/internal/api/action_request_manager.go index 6d28aef787..82c6c73f40 100644 --- a/internal/api/action_request_manager.go +++ b/internal/api/action_request_manager.go @@ -12,6 +12,7 @@ import ( ocontext "github.com/vmware-tanzu/octant/internal/context" "github.com/vmware-tanzu/octant/internal/octant" "github.com/vmware-tanzu/octant/pkg/action" + "github.com/vmware-tanzu/octant/pkg/api" ) const ( @@ -33,7 +34,7 @@ func NewActionRequestManager(dashConfig config.Dash) *ActionRequestManager { } } -func (a ActionRequestManager) Start(ctx context.Context, state octant.State, s OctantClient) { +func (a ActionRequestManager) Start(ctx context.Context, state octant.State, s api.OctantClient) { } // Handlers returns the handlers this manager supports. diff --git a/internal/api/api.go b/internal/api/api.go index e45b144ba1..070b827935 100644 --- a/internal/api/api.go +++ b/internal/api/api.go @@ -22,6 +22,7 @@ import ( "github.com/vmware-tanzu/octant/internal/config" "github.com/vmware-tanzu/octant/internal/mime" "github.com/vmware-tanzu/octant/internal/module" + "github.com/vmware-tanzu/octant/pkg/api" "github.com/vmware-tanzu/octant/pkg/log" ) @@ -36,7 +37,7 @@ const ( defaultListenerAddr = "127.0.0.1:7777" ) -func acceptedHosts() []string { +func AcceptedHosts() []string { hosts := []string{ "localhost", "127.0.0.1", @@ -117,11 +118,11 @@ func RespondWithError(w http.ResponseWriter, code int, message string, logger lo type API struct { ctx context.Context moduleManager module.ManagerInterface - actionDispatcher ActionDispatcher + actionDispatcher api.ActionDispatcher prefix string dashConfig config.Dash logger log.Logger - scManager *StreamingConnectionManager + scManager *api.StreamingConnectionManager modulePaths map[string]module.Module modules []module.Module @@ -131,7 +132,7 @@ type API struct { var _ Service = (*API)(nil) // New creates an instance of API. -func New(ctx context.Context, prefix string, actionDispatcher ActionDispatcher, streamingConnectionManager *StreamingConnectionManager, dashConfig config.Dash) *API { +func New(ctx context.Context, prefix string, actionDispatcher api.ActionDispatcher, streamingConnectionManager *api.StreamingConnectionManager, dashConfig config.Dash) *API { logger := dashConfig.Logger().With("component", "api") return &API{ ctx: ctx, @@ -156,7 +157,7 @@ func (a *API) Handler(ctx context.Context) (http.Handler, error) { return nil, fmt.Errorf("missing dashConfig") } router := mux.NewRouter() - router.Use(rebindHandler(ctx, acceptedHosts())) + router.Use(rebindHandler(ctx, AcceptedHosts())) s := router.PathPrefix(a.prefix).Subrouter() @@ -174,10 +175,10 @@ func (a *API) Handler(ctx context.Context) (http.Handler, error) { type LoadingAPI struct { ctx context.Context moduleManager module.ManagerInterface - actionDispatcher ActionDispatcher + actionDispatcher api.ActionDispatcher prefix string logger log.Logger - scManager *StreamingConnectionManager + scManager *api.StreamingConnectionManager modulePaths map[string]module.Module modules []module.Module @@ -187,7 +188,7 @@ type LoadingAPI struct { var _ Service = (*LoadingAPI)(nil) // NewLoadingAPI creates an instance of LoadingAPI -func NewLoadingAPI(ctx context.Context, prefix string, actionDispatcher ActionDispatcher, websocketClientManager *StreamingConnectionManager, logger log.Logger) *LoadingAPI { +func NewLoadingAPI(ctx context.Context, prefix string, actionDispatcher api.ActionDispatcher, websocketClientManager *api.StreamingConnectionManager, logger log.Logger) *LoadingAPI { logger = logger.With("component", "loading api") return &LoadingAPI{ ctx: ctx, @@ -208,7 +209,7 @@ func (l *LoadingAPI) ForceUpdate() error { // Handler contains a list of handlers func (l *LoadingAPI) Handler(ctx context.Context) (http.Handler, error) { router := mux.NewRouter() - router.Use(rebindHandler(ctx, acceptedHosts())) + router.Use(rebindHandler(ctx, AcceptedHosts())) s := router.PathPrefix(l.prefix).Subrouter() diff --git a/internal/api/api_test.go b/internal/api/api_test.go index f0c323fa57..7cf5a30901 100644 --- a/internal/api/api_test.go +++ b/internal/api/api_test.go @@ -20,13 +20,14 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" - "github.com/vmware-tanzu/octant/internal/api" - apiFake "github.com/vmware-tanzu/octant/internal/api/fake" + internalAPI "github.com/vmware-tanzu/octant/internal/api" clusterFake "github.com/vmware-tanzu/octant/internal/cluster/fake" configFake "github.com/vmware-tanzu/octant/internal/config/fake" "github.com/vmware-tanzu/octant/internal/log" "github.com/vmware-tanzu/octant/internal/module" moduleFake "github.com/vmware-tanzu/octant/internal/module/fake" + "github.com/vmware-tanzu/octant/pkg/api" + apiFake "github.com/vmware-tanzu/octant/pkg/api/fake" "github.com/vmware-tanzu/octant/pkg/navigation" "github.com/vmware-tanzu/octant/pkg/view/component" ) @@ -100,7 +101,7 @@ func TestAPI_routes(t *testing.T) { ctx := context.Background() scManager := api.NewStreamingConnectionManager(ctx, actionDispatcher, streamingClientFactory) - srv := api.New(ctx, "/", actionDispatcher, scManager, dashConfig) + srv := internalAPI.New(ctx, "/", actionDispatcher, scManager, dashConfig) handler, err := srv.Handler(ctx) require.NoError(t, err) diff --git a/internal/api/container_logs.go b/internal/api/container_logs.go index d03518abfb..ca4c6e9dfc 100644 --- a/internal/api/container_logs.go +++ b/internal/api/container_logs.go @@ -12,6 +12,7 @@ import ( "sync" "time" + "github.com/vmware-tanzu/octant/pkg/api" "github.com/vmware-tanzu/octant/pkg/event" "github.com/vmware-tanzu/octant/internal/gvk" @@ -36,7 +37,7 @@ const ( ) type podLogsStateManager struct { - client OctantClient + client api.OctantClient config config.Dash ctx context.Context @@ -145,7 +146,7 @@ func (s *podLogsStateManager) StreamPodLogsUnsubscribe(_ octant.State, payload a return nil } -func (s *podLogsStateManager) Start(ctx context.Context, _ octant.State, client OctantClient) { +func (s *podLogsStateManager) Start(ctx context.Context, _ octant.State, client api.OctantClient) { s.client = client s.ctx = ctx } diff --git a/internal/api/container_logs_test.go b/internal/api/container_logs_test.go index 0882a0579e..4ed7f433f8 100644 --- a/internal/api/container_logs_test.go +++ b/internal/api/container_logs_test.go @@ -6,6 +6,7 @@ import ( "testing" "time" + "github.com/vmware-tanzu/octant/pkg/api" "github.com/vmware-tanzu/octant/pkg/event" "github.com/golang/mock/gomock" @@ -106,7 +107,7 @@ type octantClient struct { stopCh chan struct{} } -var _ OctantClient = &octantClient{} +var _ api.OctantClient = &octantClient{} func newOctantClient() *octantClient { return &octantClient{ diff --git a/internal/api/content_manager.go b/internal/api/content_manager.go index 14bde8c94e..c7f84e473b 100644 --- a/internal/api/content_manager.go +++ b/internal/api/content_manager.go @@ -18,6 +18,7 @@ import ( "github.com/vmware-tanzu/octant/internal/config" ocontext "github.com/vmware-tanzu/octant/internal/context" + "github.com/vmware-tanzu/octant/pkg/api" oevent "github.com/vmware-tanzu/octant/pkg/event" oerrors "github.com/vmware-tanzu/octant/internal/errors" @@ -107,7 +108,7 @@ func NewContentManager(moduleManager module.ManagerInterface, dashConfig config. var _ StateManager = (*ContentManager)(nil) // Start starts the manager. -func (cm *ContentManager) Start(ctx context.Context, state octant.State, s OctantClient) { +func (cm *ContentManager) Start(ctx context.Context, state octant.State, s api.OctantClient) { cm.ctx = ctx logger := internalLog.From(ctx) logger.Debugf("starting content manager") @@ -132,7 +133,7 @@ func (cm *ContentManager) Start(ctx context.Context, state octant.State, s Octan cm.poller.Run(ctx, cm.updateContentCh, cm.runUpdate(state, s), event.DefaultScheduleDelay) } -func (cm *ContentManager) runUpdate(state octant.State, s OctantClient) PollerFunc { +func (cm *ContentManager) runUpdate(state octant.State, s api.OctantClient) PollerFunc { previousChecksum := "" return func(ctx context.Context) bool { diff --git a/internal/api/content_manager_test.go b/internal/api/content_manager_test.go index 9220333c92..684bb7ec3b 100644 --- a/internal/api/content_manager_test.go +++ b/internal/api/content_manager_test.go @@ -7,13 +7,14 @@ package api_test import ( "context" + "sort" "testing" "github.com/golang/mock/gomock" + "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "github.com/vmware-tanzu/octant/internal/api" - "github.com/vmware-tanzu/octant/internal/api/fake" configFake "github.com/vmware-tanzu/octant/internal/config/fake" ocontext "github.com/vmware-tanzu/octant/internal/context" "github.com/vmware-tanzu/octant/internal/log" @@ -22,6 +23,7 @@ import ( "github.com/vmware-tanzu/octant/internal/octant" octantFake "github.com/vmware-tanzu/octant/internal/octant/fake" "github.com/vmware-tanzu/octant/pkg/action" + "github.com/vmware-tanzu/octant/pkg/api/fake" "github.com/vmware-tanzu/octant/pkg/navigation" "github.com/vmware-tanzu/octant/pkg/view/component" ) @@ -209,3 +211,15 @@ func TestContentManager_SetQueryParams(t *testing.T) { }) } } + +func AssertHandlers(t *testing.T, manager api.StateManager, expected []string) { + handlers := manager.Handlers() + var got []string + for _, h := range handlers { + got = append(got, h.RequestType) + } + sort.Strings(got) + sort.Strings(expected) + + assert.Equal(t, expected, got) +} diff --git a/internal/api/context_manager.go b/internal/api/context_manager.go index 47287474f4..7726da26cf 100644 --- a/internal/api/context_manager.go +++ b/internal/api/context_manager.go @@ -11,6 +11,7 @@ import ( "github.com/vmware-tanzu/octant/internal/util/json" + "github.com/vmware-tanzu/octant/pkg/api" oevent "github.com/vmware-tanzu/octant/pkg/event" "github.com/pkg/errors" @@ -89,12 +90,12 @@ func (c *ContextManager) SetContext(state octant.State, payload action.Payload) } // Start starts the manager. -func (c *ContextManager) Start(ctx context.Context, state octant.State, s OctantClient) { +func (c *ContextManager) Start(ctx context.Context, state octant.State, s api.OctantClient) { c.poller.Run(ctx, nil, c.runUpdate(state, s), event.DefaultScheduleDelay) c.ctx = ctx } -func (c *ContextManager) runUpdate(state octant.State, s OctantClient) PollerFunc { +func (c *ContextManager) runUpdate(state octant.State, s api.OctantClient) PollerFunc { var previous []byte logger := c.dashConfig.Logger() diff --git a/internal/api/context_manager_test.go b/internal/api/context_manager_test.go index 9dd09eeeed..20f6b50e68 100644 --- a/internal/api/context_manager_test.go +++ b/internal/api/context_manager_test.go @@ -15,11 +15,11 @@ import ( "github.com/golang/mock/gomock" "github.com/vmware-tanzu/octant/internal/api" - "github.com/vmware-tanzu/octant/internal/api/fake" configFake "github.com/vmware-tanzu/octant/internal/config/fake" "github.com/vmware-tanzu/octant/internal/log" "github.com/vmware-tanzu/octant/internal/octant" octantFake "github.com/vmware-tanzu/octant/internal/octant/fake" + "github.com/vmware-tanzu/octant/pkg/api/fake" ) func TestContextManager_Handlers(t *testing.T) { diff --git a/internal/api/filter_manager.go b/internal/api/filter_manager.go index 924754bda6..c76233332b 100644 --- a/internal/api/filter_manager.go +++ b/internal/api/filter_manager.go @@ -16,6 +16,7 @@ import ( "github.com/vmware-tanzu/octant/internal/octant" "github.com/vmware-tanzu/octant/pkg/action" + "github.com/vmware-tanzu/octant/pkg/api" ) const ( @@ -37,7 +38,7 @@ func NewFilterManager() *FilterManager { } // Start starts the manager. Current is a no-op. -func (fm *FilterManager) Start(ctx context.Context, state octant.State, s OctantClient) { +func (fm *FilterManager) Start(ctx context.Context, state octant.State, s api.OctantClient) { fm.ctx = ctx } diff --git a/internal/api/helper_manager.go b/internal/api/helper_manager.go index 8534618c47..1203b617a4 100644 --- a/internal/api/helper_manager.go +++ b/internal/api/helper_manager.go @@ -8,6 +8,7 @@ package api import ( "context" + "github.com/vmware-tanzu/octant/pkg/api" oevent "github.com/vmware-tanzu/octant/pkg/event" "github.com/vmware-tanzu/octant/internal/config" @@ -67,7 +68,7 @@ func (h *HelperStateManager) Handlers() []octant.ClientRequestHandler { } // Start starts the manager -func (h *HelperStateManager) Start(ctx context.Context, state octant.State, client OctantClient) { +func (h *HelperStateManager) Start(ctx context.Context, state octant.State, client api.OctantClient) { h.poller.Run(ctx, nil, h.runUpdate(state, client), event.DefaultScheduleDelay) } diff --git a/internal/api/helper_manager_test.go b/internal/api/helper_manager_test.go index c6059a9770..a270182b46 100644 --- a/internal/api/helper_manager_test.go +++ b/internal/api/helper_manager_test.go @@ -14,10 +14,10 @@ import ( "github.com/golang/mock/gomock" "github.com/vmware-tanzu/octant/internal/api" - "github.com/vmware-tanzu/octant/internal/api/fake" configFake "github.com/vmware-tanzu/octant/internal/config/fake" "github.com/vmware-tanzu/octant/internal/log" octantFake "github.com/vmware-tanzu/octant/internal/octant/fake" + "github.com/vmware-tanzu/octant/pkg/api/fake" ) func TestHelperManager_GenerateContent(t *testing.T) { diff --git a/internal/api/loading_state.go b/internal/api/loading_state.go index c416a12ba8..e6f15c567e 100644 --- a/internal/api/loading_state.go +++ b/internal/api/loading_state.go @@ -15,6 +15,7 @@ import ( "sync/atomic" "time" + "github.com/vmware-tanzu/octant/pkg/api" "github.com/vmware-tanzu/octant/pkg/event" "github.com/spf13/afero" @@ -57,7 +58,7 @@ func (l *LoadingManager) Handlers() []octant.ClientRequestHandler { } } -func (l *LoadingManager) Start(ctx context.Context, state octant.State, client OctantClient) { +func (l *LoadingManager) Start(ctx context.Context, state octant.State, client api.OctantClient) { l.client.Store(client) l.ctx = ctx l.kubeConfigPath = ocontext.KubeConfigChFrom(ctx) @@ -76,7 +77,7 @@ func (l *LoadingManager) CheckLoading(state octant.State, payload action.Payload } if loading { - client := l.client.Load().(OctantClient) + client := l.client.Load().(api.OctantClient) client.Send(event.Event{ Type: event.EventTypeLoading, }) @@ -123,7 +124,7 @@ func (l *LoadingManager) UploadKubeConfig(state octant.State, payload action.Pay return err } - client := l.client.Load().(OctantClient) + client := l.client.Load().(api.OctantClient) client.Send(event.Event{ Type: event.EventTypeRefresh, }) @@ -132,7 +133,7 @@ func (l *LoadingManager) UploadKubeConfig(state octant.State, payload action.Pay return nil } -func (l *LoadingManager) WatchConfig(path chan string, client OctantClient, fs afero.Fs) { +func (l *LoadingManager) WatchConfig(path chan string, client api.OctantClient, fs afero.Fs) { kubeconfig := clientcmd.NewDefaultClientConfigLoadingRules().GetDefaultFilename() for { diff --git a/internal/api/loading_state_test.go b/internal/api/loading_state_test.go index 1a026bf8b2..08ce4a4222 100644 --- a/internal/api/loading_state_test.go +++ b/internal/api/loading_state_test.go @@ -17,7 +17,7 @@ import ( "k8s.io/client-go/tools/clientcmd" "github.com/vmware-tanzu/octant/internal/api" - "github.com/vmware-tanzu/octant/internal/api/fake" + "github.com/vmware-tanzu/octant/pkg/api/fake" ) func Test_watchConfig(t *testing.T) { diff --git a/internal/api/middleware.go b/internal/api/middleware.go index b4f7efd21a..f971793c05 100644 --- a/internal/api/middleware.go +++ b/internal/api/middleware.go @@ -82,7 +82,7 @@ func checkSameOrigin(r *http.Request) bool { // shouldAllowHost returns true if the incoming request.Host shuold be allowed // to access the API otherwise false. -func shouldAllowHost(host string, acceptedHosts []string) bool { +func ShouldAllowHost(host string, acceptedHosts []string) bool { if dashstrings.Contains("0.0.0.0", acceptedHosts) { return true } diff --git a/internal/api/middleware_test.go b/internal/api/middleware_test.go index 26b78de26f..326b19916b 100644 --- a/internal/api/middleware_test.go +++ b/internal/api/middleware_test.go @@ -87,7 +87,7 @@ func Test_rebindHandler(t *testing.T) { fmt.Fprint(w, "response") }) - wrapped := rebindHandler(context.TODO(), acceptedHosts())(fake) + wrapped := rebindHandler(context.TODO(), AcceptedHosts())(fake) ts := httptest.NewServer(wrapped) defer ts.Close() @@ -140,7 +140,7 @@ func Test_shouldAllowHost(t *testing.T) { for _, tc := range cases { t.Run(tc.name, func(t *testing.T) { - require.Equal(t, tc.expected, shouldAllowHost(tc.host, tc.acceptedHosts)) + require.Equal(t, tc.expected, ShouldAllowHost(tc.host, tc.acceptedHosts)) }) } } diff --git a/internal/api/namespaces_manager.go b/internal/api/namespaces_manager.go index b9031eaebd..0d0127a806 100644 --- a/internal/api/namespaces_manager.go +++ b/internal/api/namespaces_manager.go @@ -17,6 +17,7 @@ import ( "github.com/vmware-tanzu/octant/internal/event" "github.com/vmware-tanzu/octant/internal/log" "github.com/vmware-tanzu/octant/internal/octant" + "github.com/vmware-tanzu/octant/pkg/api" oevent "github.com/vmware-tanzu/octant/pkg/event" ) @@ -75,7 +76,7 @@ func (n NamespacesManager) Handlers() []octant.ClientRequestHandler { } // Start starts the manager. It periodically generates a list of namespaces. -func (n *NamespacesManager) Start(ctx context.Context, state octant.State, s OctantClient) { +func (n *NamespacesManager) Start(ctx context.Context, state octant.State, s api.OctantClient) { ch := make(chan struct{}, 1) defer func() { close(ch) @@ -84,7 +85,7 @@ func (n *NamespacesManager) Start(ctx context.Context, state octant.State, s Oct n.poller.Run(ctx, ch, n.runUpdate(state, s), event.DefaultScheduleDelay) } -func (n *NamespacesManager) runUpdate(state octant.State, client OctantClient) PollerFunc { +func (n *NamespacesManager) runUpdate(state octant.State, client api.OctantClient) PollerFunc { var previous []byte return func(ctx context.Context) bool { diff --git a/internal/api/namespaces_manager_test.go b/internal/api/namespaces_manager_test.go index 4d03dae4c4..0b0a6bed6f 100644 --- a/internal/api/namespaces_manager_test.go +++ b/internal/api/namespaces_manager_test.go @@ -13,10 +13,10 @@ import ( "github.com/stretchr/testify/require" "github.com/vmware-tanzu/octant/internal/api" - "github.com/vmware-tanzu/octant/internal/api/fake" clusterFake "github.com/vmware-tanzu/octant/internal/cluster/fake" configFake "github.com/vmware-tanzu/octant/internal/config/fake" octantFake "github.com/vmware-tanzu/octant/internal/octant/fake" + "github.com/vmware-tanzu/octant/pkg/api/fake" ) func TestNamespacesManager_GenerateNamespaces(t *testing.T) { diff --git a/internal/api/navigation_manager.go b/internal/api/navigation_manager.go index 1204bd5b67..a8a255bd05 100644 --- a/internal/api/navigation_manager.go +++ b/internal/api/navigation_manager.go @@ -13,6 +13,7 @@ import ( "github.com/vmware-tanzu/octant/internal/util/json" + "github.com/vmware-tanzu/octant/pkg/api" oevent "github.com/vmware-tanzu/octant/pkg/event" "github.com/pkg/errors" @@ -80,7 +81,7 @@ func (n NavigationManager) Handlers() []octant.ClientRequestHandler { } // Start starts the manager. It periodically generates navigation updates. -func (n *NavigationManager) Start(ctx context.Context, state octant.State, s OctantClient) { +func (n *NavigationManager) Start(ctx context.Context, state octant.State, s api.OctantClient) { ch := make(chan struct{}, 1) defer func() { close(ch) @@ -89,7 +90,7 @@ func (n *NavigationManager) Start(ctx context.Context, state octant.State, s Oct n.poller.Run(ctx, ch, n.runUpdate(state, s), event.DefaultScheduleDelay) } -func (n *NavigationManager) runUpdate(state octant.State, client OctantClient) PollerFunc { +func (n *NavigationManager) runUpdate(state octant.State, client api.OctantClient) PollerFunc { var previous []byte return func(ctx context.Context) bool { diff --git a/internal/api/navigation_manager_test.go b/internal/api/navigation_manager_test.go index 7d602930e8..dee844cf1d 100644 --- a/internal/api/navigation_manager_test.go +++ b/internal/api/navigation_manager_test.go @@ -13,12 +13,12 @@ import ( "github.com/stretchr/testify/require" "github.com/vmware-tanzu/octant/internal/api" - "github.com/vmware-tanzu/octant/internal/api/fake" configFake "github.com/vmware-tanzu/octant/internal/config/fake" "github.com/vmware-tanzu/octant/internal/module" moduleFake "github.com/vmware-tanzu/octant/internal/module/fake" "github.com/vmware-tanzu/octant/internal/octant" octantFake "github.com/vmware-tanzu/octant/internal/octant/fake" + "github.com/vmware-tanzu/octant/pkg/api/fake" "github.com/vmware-tanzu/octant/pkg/navigation" ) diff --git a/internal/api/state_manager.go b/internal/api/state_manager.go new file mode 100644 index 0000000000..008e4d5125 --- /dev/null +++ b/internal/api/state_manager.go @@ -0,0 +1,10 @@ +/* + Copyright (c) 2019 the Octant contributors. All Rights Reserved. + SPDX-License-Identifier: Apache-2.0 +*/ + +package api + +import "github.com/vmware-tanzu/octant/pkg/api" + +type StateManager = api.StateManager diff --git a/internal/api/streaming_service.go b/internal/api/streaming_service.go index 49c085088b..e4da453332 100644 --- a/internal/api/streaming_service.go +++ b/internal/api/streaming_service.go @@ -10,15 +10,16 @@ import ( "net/http" "github.com/vmware-tanzu/octant/internal/config" + "github.com/vmware-tanzu/octant/pkg/api" ) -func streamService(manager ClientManager, dashConfig config.Dash) http.HandlerFunc { +func streamService(manager api.ClientManager, dashConfig config.Dash) http.HandlerFunc { return func(w http.ResponseWriter, r *http.Request) { serveStreamingApi(manager, dashConfig, w, r) } } -func serveStreamingApi(manager ClientManager, dashConfig config.Dash, w http.ResponseWriter, r *http.Request) { +func serveStreamingApi(manager api.ClientManager, dashConfig config.Dash, w http.ResponseWriter, r *http.Request) { _, err := manager.ClientFromRequest(dashConfig, w, r) if err != nil { if dashConfig != nil { @@ -30,13 +31,13 @@ func serveStreamingApi(manager ClientManager, dashConfig config.Dash, w http.Res } // Create dummy websocketService and serveWebsocket -func loadingStreamService(manager ClientManager) http.HandlerFunc { +func loadingStreamService(manager api.ClientManager) http.HandlerFunc { return func(w http.ResponseWriter, r *http.Request) { serveLoadingStreamingApi(manager, w, r) } } -func serveLoadingStreamingApi(manager ClientManager, w http.ResponseWriter, r *http.Request) { +func serveLoadingStreamingApi(manager api.ClientManager, w http.ResponseWriter, r *http.Request) { _, err := manager.TemporaryClientFromLoadingRequest(w, r) if err != nil { fmt.Println("create loading websocket client") diff --git a/internal/api/terminal_manager.go b/internal/api/terminal_manager.go index a2fb79d3d9..e35a57c2ae 100644 --- a/internal/api/terminal_manager.go +++ b/internal/api/terminal_manager.go @@ -15,6 +15,7 @@ import ( "github.com/vmware-tanzu/octant/internal/util/kubernetes" + "github.com/vmware-tanzu/octant/pkg/api" "github.com/vmware-tanzu/octant/pkg/event" "github.com/pkg/errors" @@ -36,7 +37,7 @@ const ( ) type terminalStateManager struct { - client OctantClient + client api.OctantClient config config.Dash ctx context.Context instance terminal.Instance @@ -212,7 +213,7 @@ func (s *terminalStateManager) SendTerminalCommand(state octant.State, payload a return s.instance.Write([]byte(key)) } -func (s *terminalStateManager) Start(ctx context.Context, state octant.State, client OctantClient) { +func (s *terminalStateManager) Start(ctx context.Context, state octant.State, client api.OctantClient) { s.client = client s.ctx = ctx } diff --git a/internal/api/terminal_manager_test.go b/internal/api/terminal_manager_test.go index 053c10a593..ecd581587e 100644 --- a/internal/api/terminal_manager_test.go +++ b/internal/api/terminal_manager_test.go @@ -12,9 +12,9 @@ import ( "github.com/golang/mock/gomock" - "github.com/vmware-tanzu/octant/internal/api/fake" configFake "github.com/vmware-tanzu/octant/internal/config/fake" octantFake "github.com/vmware-tanzu/octant/internal/octant/fake" + "github.com/vmware-tanzu/octant/pkg/api/fake" ) func Test_TerminalStateManager(t *testing.T) { diff --git a/internal/api/websocket_service_test.go b/internal/api/websocket_service_test.go index 36546466e7..86b6a7fda5 100644 --- a/internal/api/websocket_service_test.go +++ b/internal/api/websocket_service_test.go @@ -6,13 +6,14 @@ import ( "testing" "github.com/vmware-tanzu/octant/internal/config" + "github.com/vmware-tanzu/octant/pkg/api" ) type fakeWebsocketClientManager struct { - StreamingConnectionManager + api.StreamingConnectionManager } -func (c *fakeWebsocketClientManager) ClientFromRequest(dashConfig config.Dash, w http.ResponseWriter, r *http.Request) (StreamingClient, error) { +func (c *fakeWebsocketClientManager) ClientFromRequest(dashConfig config.Dash, w http.ResponseWriter, r *http.Request) (api.StreamingClient, error) { return nil, fmt.Errorf("test: error") } diff --git a/internal/api/action.go b/pkg/api/action.go similarity index 84% rename from internal/api/action.go rename to pkg/api/action.go index 154e85cf6e..11881a5276 100644 --- a/internal/api/action.go +++ b/pkg/api/action.go @@ -11,7 +11,7 @@ import ( "github.com/vmware-tanzu/octant/pkg/action" ) -//go:generate mockgen -destination=./fake/mock_action_dispatcher.go -package=fake github.com/vmware-tanzu/octant/internal/api ActionDispatcher +//go:generate mockgen -destination=./fake/mock_action_dispatcher.go -package=fake github.com/vmware-tanzu/octant/pkg/api ActionDispatcher // ActionDispatcher dispatches actions. type ActionDispatcher interface { diff --git a/internal/api/fake/mock_action_dispatcher.go b/pkg/api/fake/mock_action_dispatcher.go similarity index 94% rename from internal/api/fake/mock_action_dispatcher.go rename to pkg/api/fake/mock_action_dispatcher.go index 496128dbf5..46c8966f64 100644 --- a/internal/api/fake/mock_action_dispatcher.go +++ b/pkg/api/fake/mock_action_dispatcher.go @@ -1,5 +1,5 @@ // Code generated by MockGen. DO NOT EDIT. -// Source: github.com/vmware-tanzu/octant/internal/api (interfaces: ActionDispatcher) +// Source: github.com/vmware-tanzu/octant/pkg/api (interfaces: ActionDispatcher) // Package fake is a generated GoMock package. package fake diff --git a/internal/api/fake/mock_client_factory.go b/pkg/api/fake/mock_client_factory.go similarity index 85% rename from internal/api/fake/mock_client_factory.go rename to pkg/api/fake/mock_client_factory.go index 418811dc72..8edd079b3c 100644 --- a/internal/api/fake/mock_client_factory.go +++ b/pkg/api/fake/mock_client_factory.go @@ -1,5 +1,5 @@ // Code generated by MockGen. DO NOT EDIT. -// Source: github.com/vmware-tanzu/octant/internal/api (interfaces: StreamingClientFactory) +// Source: github.com/vmware-tanzu/octant/pkg/api (interfaces: StreamingClientFactory) // Package fake is a generated GoMock package. package fake @@ -12,8 +12,8 @@ import ( gomock "github.com/golang/mock/gomock" uuid "github.com/google/uuid" - api "github.com/vmware-tanzu/octant/internal/api" config "github.com/vmware-tanzu/octant/internal/config" + api "github.com/vmware-tanzu/octant/pkg/api" ) // MockStreamingClientFactory is a mock of StreamingClientFactory interface @@ -40,7 +40,7 @@ func (m *MockStreamingClientFactory) EXPECT() *MockStreamingClientFactoryMockRec } // NewConnection mocks base method -func (m *MockStreamingClientFactory) NewConnection(arg0 uuid.UUID, arg1 http.ResponseWriter, arg2 *http.Request, arg3 *api.StreamingConnectionManager, arg4 config.Dash) (api.StreamingClient, context.CancelFunc, error) { +func (m *MockStreamingClientFactory) NewConnection(arg0 uuid.UUID, arg1 http.ResponseWriter, arg2 *http.Request, arg3 api.ClientManager, arg4 config.Dash) (api.StreamingClient, context.CancelFunc, error) { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "NewConnection", arg0, arg1, arg2, arg3, arg4) ret0, _ := ret[0].(api.StreamingClient) @@ -56,7 +56,7 @@ func (mr *MockStreamingClientFactoryMockRecorder) NewConnection(arg0, arg1, arg2 } // NewTemporaryConnection mocks base method -func (m *MockStreamingClientFactory) NewTemporaryConnection(arg0 uuid.UUID, arg1 http.ResponseWriter, arg2 *http.Request, arg3 *api.StreamingConnectionManager) (api.StreamingClient, context.CancelFunc, error) { +func (m *MockStreamingClientFactory) NewTemporaryConnection(arg0 uuid.UUID, arg1 http.ResponseWriter, arg2 *http.Request, arg3 api.ClientManager) (api.StreamingClient, context.CancelFunc, error) { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "NewTemporaryConnection", arg0, arg1, arg2, arg3) ret0, _ := ret[0].(api.StreamingClient) diff --git a/internal/api/fake/mock_client_manager.go b/pkg/api/fake/mock_client_manager.go similarity index 77% rename from internal/api/fake/mock_client_manager.go rename to pkg/api/fake/mock_client_manager.go index 8cdb672040..a1c271b10c 100644 --- a/internal/api/fake/mock_client_manager.go +++ b/pkg/api/fake/mock_client_manager.go @@ -1,5 +1,5 @@ // Code generated by MockGen. DO NOT EDIT. -// Source: github.com/vmware-tanzu/octant/internal/api (interfaces: ClientManager) +// Source: github.com/vmware-tanzu/octant/pkg/api (interfaces: ClientManager) // Package fake is a generated GoMock package. package fake @@ -11,8 +11,8 @@ import ( gomock "github.com/golang/mock/gomock" - api "github.com/vmware-tanzu/octant/internal/api" config "github.com/vmware-tanzu/octant/internal/config" + api "github.com/vmware-tanzu/octant/pkg/api" event "github.com/vmware-tanzu/octant/pkg/event" ) @@ -39,6 +39,20 @@ func (m *MockClientManager) EXPECT() *MockClientManagerMockRecorder { return m.recorder } +// ActionDispatcher mocks base method +func (m *MockClientManager) ActionDispatcher() api.ActionDispatcher { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "ActionDispatcher") + ret0, _ := ret[0].(api.ActionDispatcher) + return ret0 +} + +// ActionDispatcher indicates an expected call of ActionDispatcher +func (mr *MockClientManagerMockRecorder) ActionDispatcher() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ActionDispatcher", reflect.TypeOf((*MockClientManager)(nil).ActionDispatcher)) +} + // ClientFromRequest mocks base method func (m *MockClientManager) ClientFromRequest(arg0 config.Dash, arg1 http.ResponseWriter, arg2 *http.Request) (api.StreamingClient, error) { m.ctrl.T.Helper() @@ -68,6 +82,20 @@ func (mr *MockClientManagerMockRecorder) Clients() *gomock.Call { return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Clients", reflect.TypeOf((*MockClientManager)(nil).Clients)) } +// Context mocks base method +func (m *MockClientManager) Context() context.Context { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Context") + ret0, _ := ret[0].(context.Context) + return ret0 +} + +// Context indicates an expected call of Context +func (mr *MockClientManagerMockRecorder) Context() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Context", reflect.TypeOf((*MockClientManager)(nil).Context)) +} + // Get mocks base method func (m *MockClientManager) Get(arg0 string) event.WSEventSender { m.ctrl.T.Helper() diff --git a/internal/api/fake/mock_octant_client.go b/pkg/api/fake/mock_octant_client.go similarity index 96% rename from internal/api/fake/mock_octant_client.go rename to pkg/api/fake/mock_octant_client.go index 14ff44fa7a..3fa573078b 100644 --- a/internal/api/fake/mock_octant_client.go +++ b/pkg/api/fake/mock_octant_client.go @@ -1,5 +1,5 @@ // Code generated by MockGen. DO NOT EDIT. -// Source: github.com/vmware-tanzu/octant/internal/api (interfaces: OctantClient) +// Source: github.com/vmware-tanzu/octant/pkg/api (interfaces: OctantClient) // Package fake is a generated GoMock package. package fake diff --git a/internal/api/fake/mock_state_manager.go b/pkg/api/fake/mock_state_manager.go similarity index 93% rename from internal/api/fake/mock_state_manager.go rename to pkg/api/fake/mock_state_manager.go index 920fe08ca2..9ebe21a39e 100644 --- a/internal/api/fake/mock_state_manager.go +++ b/pkg/api/fake/mock_state_manager.go @@ -1,5 +1,5 @@ // Code generated by MockGen. DO NOT EDIT. -// Source: github.com/vmware-tanzu/octant/internal/api (interfaces: StateManager) +// Source: github.com/vmware-tanzu/octant/pkg/api (interfaces: StateManager) // Package fake is a generated GoMock package. package fake @@ -10,8 +10,8 @@ import ( gomock "github.com/golang/mock/gomock" - api "github.com/vmware-tanzu/octant/internal/api" octant "github.com/vmware-tanzu/octant/internal/octant" + api "github.com/vmware-tanzu/octant/pkg/api" ) // MockStateManager is a mock of StateManager interface diff --git a/internal/api/fake/mock_streaming_client.go b/pkg/api/fake/mock_streaming_client.go similarity index 96% rename from internal/api/fake/mock_streaming_client.go rename to pkg/api/fake/mock_streaming_client.go index 6b195a523d..0595495573 100644 --- a/internal/api/fake/mock_streaming_client.go +++ b/pkg/api/fake/mock_streaming_client.go @@ -1,5 +1,5 @@ // Code generated by MockGen. DO NOT EDIT. -// Source: github.com/vmware-tanzu/octant/internal/api (interfaces: StreamingClient) +// Source: github.com/vmware-tanzu/octant/pkg/api (interfaces: StreamingClient) // Package fake is a generated GoMock package. package fake @@ -9,8 +9,8 @@ import ( gomock "github.com/golang/mock/gomock" - api "github.com/vmware-tanzu/octant/internal/api" octant "github.com/vmware-tanzu/octant/internal/octant" + api "github.com/vmware-tanzu/octant/pkg/api" event "github.com/vmware-tanzu/octant/pkg/event" ) diff --git a/pkg/api/state_manager.go b/pkg/api/state_manager.go new file mode 100644 index 0000000000..0c6f2ed0dd --- /dev/null +++ b/pkg/api/state_manager.go @@ -0,0 +1,19 @@ +/* + Copyright (c) 2019 the Octant contributors. All Rights Reserved. + SPDX-License-Identifier: Apache-2.0 +*/ + +package api + +import ( + "context" + + "github.com/vmware-tanzu/octant/internal/octant" +) + +//go:generate mockgen -destination=./fake/mock_state_manager.go -package=fake github.com/vmware-tanzu/octant/pkg/api StateManager +// StateManager manages states for WebsocketState. +type StateManager interface { + Handlers() []octant.ClientRequestHandler + Start(ctx context.Context, state octant.State, s OctantClient) +} diff --git a/pkg/api/streaming.go b/pkg/api/streaming.go new file mode 100644 index 0000000000..0bf1b765a1 --- /dev/null +++ b/pkg/api/streaming.go @@ -0,0 +1,62 @@ +/* + Copyright (c) 2019 the Octant contributors. All Rights Reserved. + SPDX-License-Identifier: Apache-2.0 +*/ +package api + +import ( + "context" + "net/http" + + "github.com/google/uuid" + + "github.com/vmware-tanzu/octant/internal/config" + "github.com/vmware-tanzu/octant/internal/octant" + "github.com/vmware-tanzu/octant/pkg/action" + "github.com/vmware-tanzu/octant/pkg/event" +) + +//go:generate mockgen -destination=./fake/mock_client_manager.go -package=fake github.com/vmware-tanzu/octant/pkg/api ClientManager +//go:generate mockgen -destination=./fake/mock_client_factory.go -package=fake github.com/vmware-tanzu/octant/pkg/api StreamingClientFactory +//go:generate mockgen -destination=./fake/mock_streaming_client.go -package=fake github.com/vmware-tanzu/octant/pkg/api StreamingClient +//go:generate mockgen -destination=./fake/mock_octant_client.go -package=fake github.com/vmware-tanzu/octant/pkg/api OctantClient + +// ClientManager is an interface for managing clients. +type ClientManager interface { + Run(ctx context.Context) + Clients() []StreamingClient + ClientFromRequest(dashConfig config.Dash, w http.ResponseWriter, r *http.Request) (StreamingClient, error) + TemporaryClientFromLoadingRequest(w http.ResponseWriter, r *http.Request) (StreamingClient, error) + Get(id string) event.WSEventSender + Context() context.Context + ActionDispatcher() ActionDispatcher +} + +type StreamRequest struct { + Type string `json:"type"` + Payload action.Payload `json:"payload"` +} + +// OctantClient is the interface responsible for sending streaming data to a +// users session, usually in a browser. +type OctantClient interface { + Send(event.Event) + ID() string + StopCh() <-chan struct{} +} + +// StreamingClient is the interface responsible for sending and receiving +// streaming data to a users session, usually in a browser. +type StreamingClient interface { + OctantClient + + Receive() (StreamRequest, error) + + Handlers() map[string][]octant.ClientRequestHandler + State() octant.State +} + +type StreamingClientFactory interface { + NewConnection(uuid.UUID, http.ResponseWriter, *http.Request, ClientManager, config.Dash) (StreamingClient, context.CancelFunc, error) + NewTemporaryConnection(uuid.UUID, http.ResponseWriter, *http.Request, ClientManager) (StreamingClient, context.CancelFunc, error) +} diff --git a/internal/api/streaming_connection_manager.go b/pkg/api/streaming_connection_manager.go similarity index 70% rename from internal/api/streaming_connection_manager.go rename to pkg/api/streaming_connection_manager.go index 8b03151762..dd810924ca 100644 --- a/internal/api/streaming_connection_manager.go +++ b/pkg/api/streaming_connection_manager.go @@ -2,54 +2,19 @@ * Copyright (c) 2019 the Octant contributors. All Rights Reserved. * SPDX-License-Identifier: Apache-2.0 */ - package api import ( "context" "net/http" - "github.com/vmware-tanzu/octant/pkg/event" - "github.com/google/uuid" "github.com/vmware-tanzu/octant/internal/config" - "github.com/vmware-tanzu/octant/internal/octant" + "github.com/vmware-tanzu/octant/pkg/event" ) -//go:generate mockgen -destination=./fake/mock_client_manager.go -package=fake github.com/vmware-tanzu/octant/internal/api ClientManager -//go:generate mockgen -destination=./fake/mock_client_factory.go -package=fake github.com/vmware-tanzu/octant/internal/api StreamingClientFactory -//go:generate mockgen -destination=./fake/mock_streaming_client.go -package=fake github.com/vmware-tanzu/octant/internal/api StreamingClient - -// ClientManager is an interface for managing clients. -type ClientManager interface { - Run(ctx context.Context) - Clients() []StreamingClient - ClientFromRequest(dashConfig config.Dash, w http.ResponseWriter, r *http.Request) (StreamingClient, error) - TemporaryClientFromLoadingRequest(w http.ResponseWriter, r *http.Request) (StreamingClient, error) - Get(id string) event.WSEventSender -} - -type clientMeta struct { - cancelFunc context.CancelFunc - client StreamingClient -} - -type StreamingClientFactory interface { - NewConnection(uuid.UUID, http.ResponseWriter, *http.Request, *StreamingConnectionManager, config.Dash) (StreamingClient, context.CancelFunc, error) - NewTemporaryConnection(uuid.UUID, http.ResponseWriter, *http.Request, *StreamingConnectionManager) (StreamingClient, context.CancelFunc, error) -} - -// StreamingClient is the interface responsible for sending and receiving -// streaming data to a users session, usually in a browser. -type StreamingClient interface { - OctantClient - - Receive() (StreamRequest, error) - - Handlers() map[string][]octant.ClientRequestHandler - State() octant.State -} +var _ ClientManager = (*StreamingConnectionManager)(nil) // StreamingConnectionManager is a client manager for streams. type StreamingConnectionManager struct { @@ -72,7 +37,10 @@ type StreamingConnectionManager struct { actionDispatcher ActionDispatcher } -var _ ClientManager = (*StreamingConnectionManager)(nil) +type clientMeta struct { + cancelFunc context.CancelFunc + client StreamingClient +} // NewStreamingConnectionManager creates an instance of WebsocketClientManager. func NewStreamingConnectionManager(ctx context.Context, dispatcher ActionDispatcher, clientFactory StreamingClientFactory) *StreamingConnectionManager { @@ -168,3 +136,11 @@ func (m *StreamingConnectionManager) Get(id string) event.WSEventSender { } return nil } + +func (m *StreamingConnectionManager) Context() context.Context { + return m.ctx +} + +func (m *StreamingConnectionManager) ActionDispatcher() ActionDispatcher { + return m.actionDispatcher +} diff --git a/internal/api/websocket_client.go b/pkg/api/websockets/websocket_client.go similarity index 89% rename from internal/api/websocket_client.go rename to pkg/api/websockets/websocket_client.go index 1f47461e00..71fb50b2b6 100644 --- a/internal/api/websocket_client.go +++ b/pkg/api/websockets/websocket_client.go @@ -3,7 +3,7 @@ * SPDX-License-Identifier: Apache-2.0 */ -package api +package websockets import ( "context" @@ -12,7 +12,9 @@ import ( "time" "github.com/vmware-tanzu/octant/internal/util/json" + "github.com/vmware-tanzu/octant/pkg/api" "github.com/vmware-tanzu/octant/pkg/errors" + "github.com/vmware-tanzu/octant/pkg/log" "github.com/vmware-tanzu/octant/pkg/event" @@ -24,7 +26,6 @@ import ( internalLog "github.com/vmware-tanzu/octant/internal/log" "github.com/vmware-tanzu/octant/internal/octant" "github.com/vmware-tanzu/octant/pkg/action" - "github.com/vmware-tanzu/octant/pkg/log" ) const ( @@ -42,11 +43,6 @@ const ( maxMessageSize = 2 * 1024 * 1024 // 2MiB ) -type StreamRequest struct { - Type string `json:"type"` - Payload action.Payload `json:"payload"` -} - // WebsocketClient manages websocket clients. type WebsocketClient struct { conn *websocket.Conn @@ -55,7 +51,7 @@ type WebsocketClient struct { logger log.Logger ctx context.Context cancel context.CancelFunc - manager *StreamingConnectionManager + manager api.ClientManager isOpen atomic.Value state octant.State @@ -64,11 +60,8 @@ type WebsocketClient struct { stopCh chan struct{} } -var _ OctantClient = (*WebsocketClient)(nil) -var _ StreamingClient = (*WebsocketClient)(nil) - // NewWebsocketClient creates an instance of WebsocketClient. -func NewWebsocketClient(ctx context.Context, conn *websocket.Conn, manager *StreamingConnectionManager, dashConfig config.Dash, actionDispatcher ActionDispatcher, id uuid.UUID) *WebsocketClient { +func NewWebsocketClient(ctx context.Context, conn *websocket.Conn, manager api.ClientManager, dashConfig config.Dash, actionDispatcher api.ActionDispatcher, id uuid.UUID) *WebsocketClient { logger := dashConfig.Logger().With("component", "websocket-client", "client-id", id.String()) ctx = internalLog.WithLoggerContext(ctx, logger) @@ -105,7 +98,7 @@ func NewWebsocketClient(ctx context.Context, conn *websocket.Conn, manager *Stre } // NewTemporaryWebsocketClient creates an instance of WebsocketClient -func NewTemporaryWebsocketClient(ctx context.Context, conn *websocket.Conn, manager *StreamingConnectionManager, actionDispatcher ActionDispatcher, id uuid.UUID) *WebsocketClient { +func NewTemporaryWebsocketClient(ctx context.Context, conn *websocket.Conn, manager api.ClientManager, actionDispatcher api.ActionDispatcher, id uuid.UUID) *WebsocketClient { ctx, cancel := context.WithCancel(ctx) logger := internalLog.From(ctx) @@ -185,7 +178,7 @@ func (c *WebsocketClient) readPump() { close(c.stopCh) } -func handleStreamingMessage(client StreamingClient, request StreamRequest) error { +func handleStreamingMessage(client api.StreamingClient, request api.StreamRequest) error { handlers, ok := client.Handlers()[request.Type] if !ok { return handleUnknownRequest(client, request) @@ -210,7 +203,7 @@ func handleStreamingMessage(client StreamingClient, request StreamRequest) error return nil } -func handleUnknownRequest(client OctantClient, request StreamRequest) error { +func handleUnknownRequest(client api.OctantClient, request api.StreamRequest) error { message := "unknown request" if request.Type != "" { message = fmt.Sprintf("unknown request %s", request.Type) @@ -286,7 +279,7 @@ func (c *WebsocketClient) Send(ev event.Event) { } } -func (c *WebsocketClient) Receive() (StreamRequest, error) { +func (c *WebsocketClient) Receive() (api.StreamRequest, error) { _, message, err := c.conn.ReadMessage() if err != nil { if websocket.IsUnexpectedCloseError( @@ -294,13 +287,13 @@ func (c *WebsocketClient) Receive() (StreamRequest, error) { ) { c.logger.WithErr(err).Errorf("Unhandled websocket error") } - return StreamRequest{}, errors.FatalStreamError(err) + return api.StreamRequest{}, errors.FatalStreamError(err) } - var request StreamRequest + var request api.StreamRequest if err := json.Unmarshal(message, &request); err != nil { c.logger.WithErr(err).Errorf("Unmarshaling websocket message") - return StreamRequest{}, err + return api.StreamRequest{}, err } return request, nil diff --git a/internal/api/websocket_client_test.go b/pkg/api/websockets/websocket_client_test.go similarity index 98% rename from internal/api/websocket_client_test.go rename to pkg/api/websockets/websocket_client_test.go index 046de4306e..ce50f7d04b 100644 --- a/internal/api/websocket_client_test.go +++ b/pkg/api/websockets/websocket_client_test.go @@ -3,7 +3,7 @@ * SPDX-License-Identifier: Apache-2.0 */ -package api +package websockets import ( "testing" diff --git a/internal/api/websocket_connection_factory.go b/pkg/api/websockets/websocket_connection_factory.go similarity index 55% rename from internal/api/websocket_connection_factory.go rename to pkg/api/websockets/websocket_connection_factory.go index 18ddacc48b..f9dc054954 100644 --- a/internal/api/websocket_connection_factory.go +++ b/pkg/api/websockets/websocket_connection_factory.go @@ -1,4 +1,4 @@ -package api +package websockets import ( "context" @@ -8,7 +8,9 @@ import ( "github.com/google/uuid" "github.com/gorilla/websocket" + internalAPI "github.com/vmware-tanzu/octant/internal/api" "github.com/vmware-tanzu/octant/internal/config" + "github.com/vmware-tanzu/octant/pkg/api" ) type WebsocketConnectionFactory struct { @@ -26,24 +28,24 @@ func NewWebsocketConnectionFactory() *WebsocketConnectionFactory { return false } - return shouldAllowHost(host, acceptedHosts()) + return internalAPI.ShouldAllowHost(host, internalAPI.AcceptedHosts()) }, }, } } -var _ StreamingClientFactory = (*WebsocketConnectionFactory)(nil) +var _ api.StreamingClientFactory = (*WebsocketConnectionFactory)(nil) func (wcf *WebsocketConnectionFactory) NewConnection( - clientID uuid.UUID, w http.ResponseWriter, r *http.Request, m *StreamingConnectionManager, dashConfig config.Dash, -) (StreamingClient, context.CancelFunc, error) { + clientID uuid.UUID, w http.ResponseWriter, r *http.Request, m api.ClientManager, dashConfig config.Dash, +) (api.StreamingClient, context.CancelFunc, error) { conn, err := wcf.upgrader.Upgrade(w, r, nil) if err != nil { return nil, nil, err } - ctx, cancel := context.WithCancel(m.ctx) - client := NewWebsocketClient(ctx, conn, m, dashConfig, m.actionDispatcher, clientID) + ctx, cancel := context.WithCancel(m.Context()) + client := NewWebsocketClient(ctx, conn, m, dashConfig, m.ActionDispatcher(), clientID) go client.readPump() go client.writePump() @@ -52,15 +54,15 @@ func (wcf *WebsocketConnectionFactory) NewConnection( } func (wcf *WebsocketConnectionFactory) NewTemporaryConnection( - clientID uuid.UUID, w http.ResponseWriter, r *http.Request, m *StreamingConnectionManager, -) (StreamingClient, context.CancelFunc, error) { + clientID uuid.UUID, w http.ResponseWriter, r *http.Request, m api.ClientManager, +) (api.StreamingClient, context.CancelFunc, error) { conn, err := wcf.upgrader.Upgrade(w, r, nil) if err != nil { return nil, nil, err } - ctx, cancel := context.WithCancel(m.ctx) - client := NewTemporaryWebsocketClient(ctx, conn, m, m.actionDispatcher, clientID) + ctx, cancel := context.WithCancel(m.Context()) + client := NewTemporaryWebsocketClient(ctx, conn, m, m.ActionDispatcher(), clientID) go client.readPump() go client.writePump() diff --git a/internal/api/websocket_state.go b/pkg/api/websockets/websocket_state.go similarity index 85% rename from internal/api/websocket_state.go rename to pkg/api/websockets/websocket_state.go index fad736c576..5fd68a9196 100644 --- a/internal/api/websocket_state.go +++ b/pkg/api/websockets/websocket_state.go @@ -3,7 +3,7 @@ * SPDX-License-Identifier: Apache-2.0 */ -package api +package websockets import ( "context" @@ -14,8 +14,10 @@ import ( "strings" "sync" + internalAPI "github.com/vmware-tanzu/octant/internal/api" "github.com/vmware-tanzu/octant/internal/util/path_util" + "github.com/vmware-tanzu/octant/pkg/api" "github.com/vmware-tanzu/octant/pkg/event" "github.com/google/uuid" @@ -25,43 +27,26 @@ import ( "github.com/vmware-tanzu/octant/pkg/action" ) -//go:generate mockgen -destination=./fake/mock_state_manager.go -package=fake github.com/vmware-tanzu/octant/internal/api StateManager -//go:generate mockgen -destination=./fake/mock_octant_client.go -package=fake github.com/vmware-tanzu/octant/internal/api OctantClient - var ( reContentPathNamespace = regexp.MustCompile(`^/namespace/(?P[^/]+)/?`) ) -// StateManager manages states for WebsocketState. -type StateManager interface { - Handlers() []octant.ClientRequestHandler - Start(ctx context.Context, state octant.State, s OctantClient) -} - -func defaultStateManagers(clientID string, dashConfig config.Dash) []StateManager { +func defaultStateManagers(clientID string, dashConfig config.Dash) []api.StateManager { logger := dashConfig.Logger().With("client-id", clientID) - return []StateManager{ - NewContentManager(dashConfig.ModuleManager(), dashConfig, logger), - NewHelperStateManager(dashConfig), - NewFilterManager(), - NewNavigationManager(dashConfig), - NewNamespacesManager(dashConfig), - NewContextManager(dashConfig), - NewActionRequestManager(dashConfig), - NewTerminalStateManager(dashConfig), - NewPodLogsStateManager(dashConfig), + return []api.StateManager{ + internalAPI.NewContentManager(dashConfig.ModuleManager(), dashConfig, logger), + internalAPI.NewHelperStateManager(dashConfig), + internalAPI.NewFilterManager(), + internalAPI.NewNavigationManager(dashConfig), + internalAPI.NewNamespacesManager(dashConfig), + internalAPI.NewContextManager(dashConfig), + internalAPI.NewActionRequestManager(dashConfig), + internalAPI.NewTerminalStateManager(dashConfig), + internalAPI.NewPodLogsStateManager(dashConfig), } } -// OctantClient is the interface responsible for sending streaming data to a -// users session, usually in a browser. -type OctantClient interface { - Send(event.Event) - ID() string - StopCh() <-chan struct{} -} - type atomicString struct { mu sync.RWMutex s string @@ -92,7 +77,7 @@ func (s *atomicString) set(v string) { type WebsocketStateOption func(w *WebsocketState) // WebsocketStateManagers configures WebsocketState's state managers. -func WebsocketStateManagers(managers []StateManager) WebsocketStateOption { +func WebsocketStateManagers(managers []api.StateManager) WebsocketStateOption { return func(w *WebsocketState) { w.managers = managers } @@ -101,7 +86,7 @@ func WebsocketStateManagers(managers []StateManager) WebsocketStateOption { // WebsocketState manages state for a websocket client. type WebsocketState struct { dashConfig config.Dash - wsClient OctantClient + wsClient api.OctantClient contentPath *atomicString namespace *atomicString filters []octant.Filter @@ -109,8 +94,8 @@ type WebsocketState struct { namespaceUpdates map[string]octant.NamespaceUpdateFunc mu sync.RWMutex - managers []StateManager - actionDispatcher ActionDispatcher + managers []api.StateManager + actionDispatcher api.ActionDispatcher startCtx context.Context managersCancelFunc context.CancelFunc @@ -119,7 +104,7 @@ type WebsocketState struct { var _ octant.State = (*WebsocketState)(nil) // NewWebsocketState creates an instance of WebsocketState. -func NewWebsocketState(dashConfig config.Dash, actionDispatcher ActionDispatcher, wsClient OctantClient, options ...WebsocketStateOption) *WebsocketState { +func NewWebsocketState(dashConfig config.Dash, actionDispatcher api.ActionDispatcher, wsClient api.OctantClient, options ...WebsocketStateOption) *WebsocketState { defaultNamespace := dashConfig.DefaultNamespace() w := &WebsocketState{ @@ -144,7 +129,7 @@ func NewWebsocketState(dashConfig config.Dash, actionDispatcher ActionDispatcher return w } -func NewTemporaryWebsocketState(actionDispatcher ActionDispatcher, wsClient OctantClient, options ...WebsocketStateOption) *WebsocketState { +func NewTemporaryWebsocketState(actionDispatcher api.ActionDispatcher, wsClient api.OctantClient, options ...WebsocketStateOption) *WebsocketState { w := &WebsocketState{ wsClient: wsClient, contentPathUpdates: make(map[string]octant.ContentPathUpdateFunc), @@ -157,8 +142,8 @@ func NewTemporaryWebsocketState(actionDispatcher ActionDispatcher, wsClient Octa } if len(w.managers) < 1 { - w.managers = []StateManager{ - NewLoadingManager(), + w.managers = []api.StateManager{ + internalAPI.NewLoadingManager(), } } diff --git a/internal/api/websocket_state_test.go b/pkg/api/websockets/websocket_state_test.go similarity index 88% rename from internal/api/websocket_state_test.go rename to pkg/api/websockets/websocket_state_test.go index 739f4a95bf..83189746e7 100644 --- a/internal/api/websocket_state_test.go +++ b/pkg/api/websockets/websocket_state_test.go @@ -3,11 +3,10 @@ * SPDX-License-Identifier: Apache-2.0 */ -package api_test +package websockets_test import ( "context" - "sort" "testing" "time" @@ -15,12 +14,14 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" - "github.com/vmware-tanzu/octant/internal/api" - "github.com/vmware-tanzu/octant/internal/api/fake" + internalAPI "github.com/vmware-tanzu/octant/internal/api" configFake "github.com/vmware-tanzu/octant/internal/config/fake" "github.com/vmware-tanzu/octant/internal/log" moduleFake "github.com/vmware-tanzu/octant/internal/module/fake" "github.com/vmware-tanzu/octant/internal/octant" + "github.com/vmware-tanzu/octant/pkg/api" + "github.com/vmware-tanzu/octant/pkg/api/fake" + "github.com/vmware-tanzu/octant/pkg/api/websockets" ) func TestWebsocketState_Start(t *testing.T) { @@ -48,7 +49,7 @@ func TestWebsocketState_SetContentPath(t *testing.T) { contentPath string namespace string setup func(mocks *websocketStateMocks) - verify func(t *testing.T, s *api.WebsocketState) + verify func(t *testing.T, s *websockets.WebsocketState) }{ { name: "set content path without namespace change", @@ -60,7 +61,7 @@ func TestWebsocketState_SetContentPath(t *testing.T) { ModuleForContentPath(contentPath). Return(mocks.module, true) }, - verify: func(t *testing.T, s *api.WebsocketState) { + verify: func(t *testing.T, s *websockets.WebsocketState) { contentPath := "overview/namespace/default" assert.Equal(t, "default", s.GetNamespace()) assert.Equal(t, contentPath, s.GetContentPath()) @@ -76,7 +77,7 @@ func TestWebsocketState_SetContentPath(t *testing.T) { ModuleForContentPath(contentPath). Return(mocks.module, true) }, - verify: func(t *testing.T, s *api.WebsocketState) { + verify: func(t *testing.T, s *websockets.WebsocketState) { contentPath := "overview/foo" assert.Equal(t, "default", s.GetNamespace()) assert.Equal(t, contentPath, s.GetContentPath()) @@ -92,7 +93,7 @@ func TestWebsocketState_SetContentPath(t *testing.T) { ModuleForContentPath(contentPath). Return(mocks.module, true) }, - verify: func(t *testing.T, s *api.WebsocketState) { + verify: func(t *testing.T, s *websockets.WebsocketState) { contentPath := "overview/namespace/kube-system" assert.Equal(t, "kube-system", s.GetNamespace()) assert.Equal(t, contentPath, s.GetContentPath()) @@ -300,9 +301,9 @@ func newWebsocketStateMocks(t *testing.T, namespace string) *websocketStateMocks } } -func (w *websocketStateMocks) options() []api.WebsocketStateOption { - return []api.WebsocketStateOption{ - api.WebsocketStateManagers([]api.StateManager{w.stateManager}), +func (w *websocketStateMocks) options() []websockets.WebsocketStateOption { + return []websockets.WebsocketStateOption{ + websockets.WebsocketStateManagers([]internalAPI.StateManager{w.stateManager}), } } @@ -310,19 +311,7 @@ func (w *websocketStateMocks) finish() { w.controller.Finish() } -func (w *websocketStateMocks) factory() *api.WebsocketState { - return api.NewWebsocketState(w.dashConfig, w.actionDispatcher, w.wsClient, w.options()...) +func (w *websocketStateMocks) factory() *websockets.WebsocketState { + return websockets.NewWebsocketState(w.dashConfig, w.actionDispatcher, w.wsClient, w.options()...) } - -func AssertHandlers(t *testing.T, manager api.StateManager, expected []string) { - handlers := manager.Handlers() - var got []string - for _, h := range handlers { - got = append(got, h.RequestType) - } - sort.Strings(got) - sort.Strings(expected) - - assert.Equal(t, expected, got) -} diff --git a/pkg/dash/dash.go b/pkg/dash/dash.go index 38fa046831..1105e8ebd9 100644 --- a/pkg/dash/dash.go +++ b/pkg/dash/dash.go @@ -28,7 +28,7 @@ import ( "github.com/spf13/viper" "go.opencensus.io/trace" - "github.com/vmware-tanzu/octant/internal/api" + internalAPI "github.com/vmware-tanzu/octant/internal/api" "github.com/vmware-tanzu/octant/internal/cluster" "github.com/vmware-tanzu/octant/internal/config" ocontext "github.com/vmware-tanzu/octant/internal/context" @@ -46,6 +46,8 @@ import ( "github.com/vmware-tanzu/octant/internal/objectstore" "github.com/vmware-tanzu/octant/internal/portforward" "github.com/vmware-tanzu/octant/pkg/action" + "github.com/vmware-tanzu/octant/pkg/api" + "github.com/vmware-tanzu/octant/pkg/api/websockets" "github.com/vmware-tanzu/octant/pkg/log" "github.com/vmware-tanzu/octant/pkg/octant" "github.com/vmware-tanzu/octant/pkg/plugin" @@ -257,7 +259,7 @@ func NewRunner(ctx context.Context, logger log.Logger, opts ...RunnerOption) (*R if options.streamingClientFactory != nil { streamingConnectionManager = api.NewStreamingConnectionManager(ctx, r.actionManager, options.streamingClientFactory) } else { - streamingConnectionManager = api.NewStreamingConnectionManager(ctx, r.actionManager, api.NewWebsocketConnectionFactory()) + streamingConnectionManager = api.NewStreamingConnectionManager(ctx, r.actionManager, websockets.NewWebsocketConnectionFactory()) } r.streamingConnectionManager = streamingConnectionManager go streamingConnectionManager.Run(ctx) @@ -265,7 +267,7 @@ func NewRunner(ctx context.Context, logger log.Logger, opts ...RunnerOption) (*R var err error var pluginService *pluginAPI.GRPCService - var apiService api.Service + var apiService internalAPI.Service var apiErr error r.fs = afero.NewOsFs() @@ -293,7 +295,7 @@ func NewRunner(ctx context.Context, logger log.Logger, opts ...RunnerOption) (*R return &r, nil } -func (r *Runner) apiFromKubeConfig(kubeConfig string, opts ...RunnerOption) (api.Service, *pluginAPI.GRPCService, error) { +func (r *Runner) apiFromKubeConfig(kubeConfig string, opts ...RunnerOption) (internalAPI.Service, *pluginAPI.GRPCService, error) { logger := internalLog.From(r.ctx) validKubeConfig, err := ValidateKubeConfig(logger, kubeConfig, r.fs) if err == nil { @@ -301,7 +303,7 @@ func (r *Runner) apiFromKubeConfig(kubeConfig string, opts ...RunnerOption) (api return r.initAPI(r.ctx, logger, opts...) } else { logger.Infof("no valid kube config found, initializing loading API") - return api.NewLoadingAPI(r.ctx, api.PathPrefix, r.actionManager, r.streamingConnectionManager, logger), nil, nil + return internalAPI.NewLoadingAPI(r.ctx, internalAPI.PathPrefix, r.actionManager, r.streamingConnectionManager, logger), nil, nil } } @@ -361,7 +363,7 @@ func (r *Runner) Start(startupCh, shutdownCh chan bool, opts ...RunnerOption) er return nil } -func (r *Runner) initAPI(ctx context.Context, logger log.Logger, opts ...RunnerOption) (*api.API, *pluginAPI.GRPCService, error) { +func (r *Runner) initAPI(ctx context.Context, logger log.Logger, opts ...RunnerOption) (*internalAPI.API, *pluginAPI.GRPCService, error) { kubeConfigOptions := []kubeconfig.KubeConfigOption{} options := Options{} for _, opt := range opts { @@ -515,7 +517,7 @@ func (r *Runner) initAPI(ctx context.Context, logger log.Logger, opts ...RunnerO return nil, nil, fmt.Errorf("unable to start CRD watcher: %w", err) } - apiService := api.New(ctx, api.PathPrefix, r.actionManager, r.streamingConnectionManager, dashConfig) + apiService := internalAPI.New(ctx, internalAPI.PathPrefix, r.actionManager, r.streamingConnectionManager, dashConfig) frontendProxy.FrontendUpdateController = apiService r.apiCreated = true @@ -629,7 +631,7 @@ type dash struct { browserPath string namespace string defaultHandler func() (http.Handler, error) - apiHandler api.Service + apiHandler internalAPI.Service willOpenBrowser bool logger log.Logger handlerFactory *octant.HandlerFactory @@ -637,7 +639,7 @@ type dash struct { pluginService pluginAPI.Service } -func newDash(listener net.Listener, namespace, uiURL string, browserPath string, apiHandler api.Service, pluginHandler pluginAPI.Service, logger log.Logger) (*dash, error) { +func newDash(listener net.Listener, namespace, uiURL string, browserPath string, apiHandler internalAPI.Service, pluginHandler pluginAPI.Service, logger log.Logger) (*dash, error) { hf := octant.NewHandlerFactory( octant.BackendHandler(apiHandler.Handler), octant.FrontendURL(viper.GetString("proxy-frontend"))) @@ -657,7 +659,7 @@ func newDash(listener net.Listener, namespace, uiURL string, browserPath string, }, nil } -func (d *dash) SetAPIService(ctx context.Context, apiService api.Service) error { +func (d *dash) SetAPIService(ctx context.Context, apiService internalAPI.Service) error { d.apiHandler = apiService hf := octant.NewHandlerFactory( octant.BackendHandler(d.apiHandler.Handler), From 4834ebc37ec49f0e4217c2c114d41f219269a1f8 Mon Sep 17 00:00:00 2001 From: Michael Stergianis Date: Wed, 9 Jun 2021 17:11:05 -0400 Subject: [PATCH 7/9] Make UUID generation a responsibility of StreamingClientFactory Signed-off-by: Michael Stergianis --- pkg/api/fake/mock_client_factory.go | 17 ++++++++--------- pkg/api/streaming.go | 6 ++---- pkg/api/streaming_connection_manager.go | 15 ++------------- .../websockets/websocket_connection_factory.go | 14 ++++++++++++-- 4 files changed, 24 insertions(+), 28 deletions(-) diff --git a/pkg/api/fake/mock_client_factory.go b/pkg/api/fake/mock_client_factory.go index 8edd079b3c..4b70ca13bf 100644 --- a/pkg/api/fake/mock_client_factory.go +++ b/pkg/api/fake/mock_client_factory.go @@ -10,7 +10,6 @@ import ( reflect "reflect" gomock "github.com/golang/mock/gomock" - uuid "github.com/google/uuid" config "github.com/vmware-tanzu/octant/internal/config" api "github.com/vmware-tanzu/octant/pkg/api" @@ -40,9 +39,9 @@ func (m *MockStreamingClientFactory) EXPECT() *MockStreamingClientFactoryMockRec } // NewConnection mocks base method -func (m *MockStreamingClientFactory) NewConnection(arg0 uuid.UUID, arg1 http.ResponseWriter, arg2 *http.Request, arg3 api.ClientManager, arg4 config.Dash) (api.StreamingClient, context.CancelFunc, error) { +func (m *MockStreamingClientFactory) NewConnection(arg0 http.ResponseWriter, arg1 *http.Request, arg2 api.ClientManager, arg3 config.Dash) (api.StreamingClient, context.CancelFunc, error) { m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "NewConnection", arg0, arg1, arg2, arg3, arg4) + ret := m.ctrl.Call(m, "NewConnection", arg0, arg1, arg2, arg3) ret0, _ := ret[0].(api.StreamingClient) ret1, _ := ret[1].(context.CancelFunc) ret2, _ := ret[2].(error) @@ -50,15 +49,15 @@ func (m *MockStreamingClientFactory) NewConnection(arg0 uuid.UUID, arg1 http.Res } // NewConnection indicates an expected call of NewConnection -func (mr *MockStreamingClientFactoryMockRecorder) NewConnection(arg0, arg1, arg2, arg3, arg4 interface{}) *gomock.Call { +func (mr *MockStreamingClientFactoryMockRecorder) NewConnection(arg0, arg1, arg2, arg3 interface{}) *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "NewConnection", reflect.TypeOf((*MockStreamingClientFactory)(nil).NewConnection), arg0, arg1, arg2, arg3, arg4) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "NewConnection", reflect.TypeOf((*MockStreamingClientFactory)(nil).NewConnection), arg0, arg1, arg2, arg3) } // NewTemporaryConnection mocks base method -func (m *MockStreamingClientFactory) NewTemporaryConnection(arg0 uuid.UUID, arg1 http.ResponseWriter, arg2 *http.Request, arg3 api.ClientManager) (api.StreamingClient, context.CancelFunc, error) { +func (m *MockStreamingClientFactory) NewTemporaryConnection(arg0 http.ResponseWriter, arg1 *http.Request, arg2 api.ClientManager) (api.StreamingClient, context.CancelFunc, error) { m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "NewTemporaryConnection", arg0, arg1, arg2, arg3) + ret := m.ctrl.Call(m, "NewTemporaryConnection", arg0, arg1, arg2) ret0, _ := ret[0].(api.StreamingClient) ret1, _ := ret[1].(context.CancelFunc) ret2, _ := ret[2].(error) @@ -66,7 +65,7 @@ func (m *MockStreamingClientFactory) NewTemporaryConnection(arg0 uuid.UUID, arg1 } // NewTemporaryConnection indicates an expected call of NewTemporaryConnection -func (mr *MockStreamingClientFactoryMockRecorder) NewTemporaryConnection(arg0, arg1, arg2, arg3 interface{}) *gomock.Call { +func (mr *MockStreamingClientFactoryMockRecorder) NewTemporaryConnection(arg0, arg1, arg2 interface{}) *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "NewTemporaryConnection", reflect.TypeOf((*MockStreamingClientFactory)(nil).NewTemporaryConnection), arg0, arg1, arg2, arg3) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "NewTemporaryConnection", reflect.TypeOf((*MockStreamingClientFactory)(nil).NewTemporaryConnection), arg0, arg1, arg2) } diff --git a/pkg/api/streaming.go b/pkg/api/streaming.go index 0bf1b765a1..a7ccdcce74 100644 --- a/pkg/api/streaming.go +++ b/pkg/api/streaming.go @@ -8,8 +8,6 @@ import ( "context" "net/http" - "github.com/google/uuid" - "github.com/vmware-tanzu/octant/internal/config" "github.com/vmware-tanzu/octant/internal/octant" "github.com/vmware-tanzu/octant/pkg/action" @@ -57,6 +55,6 @@ type StreamingClient interface { } type StreamingClientFactory interface { - NewConnection(uuid.UUID, http.ResponseWriter, *http.Request, ClientManager, config.Dash) (StreamingClient, context.CancelFunc, error) - NewTemporaryConnection(uuid.UUID, http.ResponseWriter, *http.Request, ClientManager) (StreamingClient, context.CancelFunc, error) + NewConnection(http.ResponseWriter, *http.Request, ClientManager, config.Dash) (StreamingClient, context.CancelFunc, error) + NewTemporaryConnection(http.ResponseWriter, *http.Request, ClientManager) (StreamingClient, context.CancelFunc, error) } diff --git a/pkg/api/streaming_connection_manager.go b/pkg/api/streaming_connection_manager.go index dd810924ca..e05b922a78 100644 --- a/pkg/api/streaming_connection_manager.go +++ b/pkg/api/streaming_connection_manager.go @@ -8,8 +8,6 @@ import ( "context" "net/http" - "github.com/google/uuid" - "github.com/vmware-tanzu/octant/internal/config" "github.com/vmware-tanzu/octant/pkg/event" ) @@ -87,12 +85,8 @@ func (m *StreamingConnectionManager) Run(ctx context.Context) { // ClientFromRequest creates a websocket client from a http request. func (m *StreamingConnectionManager) ClientFromRequest(dashConfig config.Dash, w http.ResponseWriter, r *http.Request) (StreamingClient, error) { - clientID, err := uuid.NewUUID() - if err != nil { - return nil, err - } - client, cancel, err := m.clientFactory.NewConnection(clientID, w, r, m, dashConfig) + client, cancel, err := m.clientFactory.NewConnection(w, r, m, dashConfig) if err != nil { return nil, err } @@ -108,12 +102,7 @@ func (m *StreamingConnectionManager) ClientFromRequest(dashConfig config.Dash, w } func (m *StreamingConnectionManager) TemporaryClientFromLoadingRequest(w http.ResponseWriter, r *http.Request) (StreamingClient, error) { - clientID, err := uuid.NewUUID() - if err != nil { - return nil, err - } - - client, cancel, err := m.clientFactory.NewTemporaryConnection(clientID, w, r, m) + client, cancel, err := m.clientFactory.NewTemporaryConnection(w, r, m) if err != nil { return nil, err } diff --git a/pkg/api/websockets/websocket_connection_factory.go b/pkg/api/websockets/websocket_connection_factory.go index f9dc054954..74c1cd58a4 100644 --- a/pkg/api/websockets/websocket_connection_factory.go +++ b/pkg/api/websockets/websocket_connection_factory.go @@ -37,8 +37,13 @@ func NewWebsocketConnectionFactory() *WebsocketConnectionFactory { var _ api.StreamingClientFactory = (*WebsocketConnectionFactory)(nil) func (wcf *WebsocketConnectionFactory) NewConnection( - clientID uuid.UUID, w http.ResponseWriter, r *http.Request, m api.ClientManager, dashConfig config.Dash, + w http.ResponseWriter, r *http.Request, m api.ClientManager, dashConfig config.Dash, ) (api.StreamingClient, context.CancelFunc, error) { + clientID, err := uuid.NewUUID() + if err != nil { + return nil, nil, err + } + conn, err := wcf.upgrader.Upgrade(w, r, nil) if err != nil { return nil, nil, err @@ -54,8 +59,13 @@ func (wcf *WebsocketConnectionFactory) NewConnection( } func (wcf *WebsocketConnectionFactory) NewTemporaryConnection( - clientID uuid.UUID, w http.ResponseWriter, r *http.Request, m api.ClientManager, + w http.ResponseWriter, r *http.Request, m api.ClientManager, ) (api.StreamingClient, context.CancelFunc, error) { + clientID, err := uuid.NewUUID() + if err != nil { + return nil, nil, err + } + conn, err := wcf.upgrader.Upgrade(w, r, nil) if err != nil { return nil, nil, err From 8aa7979ff97bfb0cf39f5905bed36201243ddf10 Mon Sep 17 00:00:00 2001 From: Wayne Witzel III Date: Tue, 22 Jun 2021 18:41:09 -0400 Subject: [PATCH 8/9] update import paths Signed-off-by: Wayne Witzel III --- internal/api/helper_manager.go | 4 ++-- internal/api/middleware.go | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/internal/api/helper_manager.go b/internal/api/helper_manager.go index 1203b617a4..acec2d2fc8 100644 --- a/internal/api/helper_manager.go +++ b/internal/api/helper_manager.go @@ -44,7 +44,7 @@ type HelperStateManager struct { poller Poller } -var _StateManager = (*HelperStateManager)(nil) +var _ = (*HelperStateManager)(nil) // NewHelperStateManager creates an instance of HelperStateManager func NewHelperStateManager(dashConfig config.Dash, options ...HelperStateManagerOption) *HelperStateManager { @@ -72,7 +72,7 @@ func (h *HelperStateManager) Start(ctx context.Context, state octant.State, clie h.poller.Run(ctx, nil, h.runUpdate(state, client), event.DefaultScheduleDelay) } -func (h *HelperStateManager) runUpdate(state octant.State, client OctantClient) PollerFunc { +func (h *HelperStateManager) runUpdate(state octant.State, client api.OctantClient) PollerFunc { var buildInfoGenerated, kubeConfigPathGenerated bool return func(ctx context.Context) bool { diff --git a/internal/api/middleware.go b/internal/api/middleware.go index f971793c05..614ea8a29b 100644 --- a/internal/api/middleware.go +++ b/internal/api/middleware.go @@ -107,7 +107,7 @@ func rebindHandler(ctx context.Context, acceptedHosts []string) mux.MiddlewareFu } var httpErrors []string - if !shouldAllowHost(host, acceptedHosts) { + if !ShouldAllowHost(host, acceptedHosts) { logger := log.From(ctx) logger.Debugf("Requester %s not in accepted hosts: %s\nTo allow this host add it to the OCTANT_ACCEPTED_HOSTS environment variable.", host, acceptedHosts) httpErrors = append(httpErrors, "forbidden host") From 47b0b54d4c9d18e127ebcb4774ad47e43a51cd0e Mon Sep 17 00:00:00 2001 From: Wayne Witzel III Date: Tue, 22 Jun 2021 19:04:52 -0400 Subject: [PATCH 9/9] add compile time StateManager interface check Signed-off-by: Wayne Witzel III --- internal/api/helper_manager.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/internal/api/helper_manager.go b/internal/api/helper_manager.go index acec2d2fc8..6f3ab3bcf2 100644 --- a/internal/api/helper_manager.go +++ b/internal/api/helper_manager.go @@ -44,7 +44,7 @@ type HelperStateManager struct { poller Poller } -var _ = (*HelperStateManager)(nil) +var _ StateManager = (*HelperStateManager)(nil) // NewHelperStateManager creates an instance of HelperStateManager func NewHelperStateManager(dashConfig config.Dash, options ...HelperStateManagerOption) *HelperStateManager {