Skip to content

Commit

Permalink
Removed StartColocationGroup from the pipe API.
Browse files Browse the repository at this point in the history
Consider a component A that calls weaver.Get on a different component B.
Before this PR, the weavelet hosting A would send two messages over the
pipe to the envelope.

1. It would send a StartComponent message indicating that the B
   component should be started by B's colocation group.
2. It would send a StartColocationGroup message indicating that B's
   colocation group should be started, if it hasn't already.

This PR deletes the unneeded StartColocationGroup message. When a
weavelet sends a StartComponent message, the envelope now registers that
the component should be started *and* starts its colocation group, if it
hasn't already. This simplifies the pipe API and makes implementing
deployers a little bit easier.

I also removed some unneeded fields in the StartComponent proto.
  • Loading branch information
mwhittaker committed Mar 13, 2023
1 parent d515cd7 commit f287c0f
Show file tree
Hide file tree
Showing 17 changed files with 536 additions and 646 deletions.
5 changes: 0 additions & 5 deletions env.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,11 +30,6 @@ type env interface {
// GetWeaveletInfo returns the weavelet information from the environment.
GetWeaveletInfo() *protos.WeaveletInfo

// StartColocationGroup starts running the specified colocation group,
// if it's not already started.
// Succeeds without an error if it has already been started.
StartColocationGroup(context.Context, *protos.ColocationGroup) error

// RegisterComponentToStart registers a component to start in a given
// target colocation group.
//
Expand Down
7 changes: 5 additions & 2 deletions godeps.txt
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ github.com/ServiceWeaver/weaver
github.com/DataDog/hyperloglog
github.com/ServiceWeaver/weaver/internal/cond
github.com/ServiceWeaver/weaver/internal/envelope/conn
github.com/ServiceWeaver/weaver/internal/files
github.com/ServiceWeaver/weaver/internal/logtype
github.com/ServiceWeaver/weaver/internal/metrics
github.com/ServiceWeaver/weaver/internal/net/call
Expand All @@ -19,7 +20,6 @@ github.com/ServiceWeaver/weaver
github.com/ServiceWeaver/weaver/runtime/logging
github.com/ServiceWeaver/weaver/runtime/metrics
github.com/ServiceWeaver/weaver/runtime/perfetto
github.com/ServiceWeaver/weaver/runtime/protomsg
github.com/ServiceWeaver/weaver/runtime/protos
github.com/ServiceWeaver/weaver/runtime/retry
github.com/google/uuid
Expand Down Expand Up @@ -511,6 +511,7 @@ github.com/ServiceWeaver/weaver/internal/tool/multi
flag
fmt
github.com/ServiceWeaver/weaver/internal/babysitter
github.com/ServiceWeaver/weaver/internal/files
github.com/ServiceWeaver/weaver/internal/status
github.com/ServiceWeaver/weaver/runtime
github.com/ServiceWeaver/weaver/runtime/codegen
Expand All @@ -530,6 +531,7 @@ github.com/ServiceWeaver/weaver/internal/tool/multi
github.com/ServiceWeaver/weaver/internal/tool/single
context
fmt
github.com/ServiceWeaver/weaver/internal/files
github.com/ServiceWeaver/weaver/internal/status
github.com/ServiceWeaver/weaver/runtime/tool
path/filepath
Expand Down Expand Up @@ -559,6 +561,7 @@ github.com/ServiceWeaver/weaver/internal/tool/ssh/impl
context
errors
fmt
github.com/ServiceWeaver/weaver/internal/files
github.com/ServiceWeaver/weaver/internal/logtype
github.com/ServiceWeaver/weaver/internal/metrics
github.com/ServiceWeaver/weaver/internal/proto
Expand Down Expand Up @@ -724,6 +727,7 @@ github.com/ServiceWeaver/weaver/runtime/perfetto
encoding/json
errors
fmt
github.com/ServiceWeaver/weaver/internal/files
github.com/ServiceWeaver/weaver/internal/traceio
github.com/ServiceWeaver/weaver/runtime/logging
github.com/ServiceWeaver/weaver/runtime/retry
Expand All @@ -735,7 +739,6 @@ github.com/ServiceWeaver/weaver/runtime/perfetto
modernc.org/sqlite/lib
net
net/http
os
path/filepath
strconv
time
Expand Down
14 changes: 6 additions & 8 deletions internal/babysitter/babysitter.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,11 +141,7 @@ func (b *Babysitter) RegisterStatusPages(mux *http.ServeMux) {
status.RegisterServer(mux, b, b.logger)
}

// StartColocationGroup implements the protos.EnvelopeHandler interface.
func (b *Babysitter) StartColocationGroup(group *protos.ColocationGroup) error {
b.mu.Lock()
defer b.mu.Unlock()

func (b *Babysitter) startColocationGroup(group *protos.ColocationGroup) error {
envelopes, ok := b.managed[group.Name]
if ok && len(envelopes) == DefaultReplication {
// Already started.
Expand Down Expand Up @@ -202,8 +198,8 @@ func (b *Babysitter) StartComponent(req *protos.ComponentToStart) error {
if _, ok := g.Assignments[req.Component]; !ok {
// Create an initial assignment for the component.
g.Assignments[req.Component] = &protos.Assignment{
App: req.App,
DeploymentId: req.DeploymentId,
App: b.dep.App.Name,
DeploymentId: b.dep.Id,
Component: req.Component,
}
}
Expand All @@ -214,7 +210,9 @@ func (b *Babysitter) StartComponent(req *protos.ComponentToStart) error {

// Store app state
b.appState.Update(appVersionStateKey, state)
return nil

// Start the colocation group, if it hasn't already started.
return b.startColocationGroup(&protos.ColocationGroup{Name: req.ColocationGroup})
}

// RegisterReplica implements the protos.EnvelopeHandler interface.
Expand Down
11 changes: 5 additions & 6 deletions internal/envelope/conn/conn_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,12 +145,11 @@ type handlerForTest struct{}

var _ conn.EnvelopeHandler = &handlerForTest{}

func (h *handlerForTest) RecvTraceSpans([]trace.ReadOnlySpan) error { return nil }
func (h *handlerForTest) RecvLogEntry(*protos.LogEntry) {}
func (h *handlerForTest) StartComponent(*protos.ComponentToStart) error { return nil }
func (h *handlerForTest) RegisterReplica(*protos.ReplicaToRegister) error { return nil }
func (h *handlerForTest) StartColocationGroup(*protos.ColocationGroup) error { return nil }
func (h *handlerForTest) ReportLoad(*protos.WeaveletLoadReport) error { return nil }
func (h *handlerForTest) RecvTraceSpans([]trace.ReadOnlySpan) error { return nil }
func (h *handlerForTest) RecvLogEntry(*protos.LogEntry) {}
func (h *handlerForTest) StartComponent(*protos.ComponentToStart) error { return nil }
func (h *handlerForTest) RegisterReplica(*protos.ReplicaToRegister) error { return nil }
func (h *handlerForTest) ReportLoad(*protos.WeaveletLoadReport) error { return nil }
func (h *handlerForTest) GetRoutingInfo(*protos.GetRoutingInfo) (*protos.RoutingInfo, error) {
return nil, nil
}
Expand Down
5 changes: 0 additions & 5 deletions internal/envelope/conn/envelope_conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,6 @@ type EnvelopeHandler interface {
// StartComponent starts the given component.
StartComponent(entry *protos.ComponentToStart) error

// StartColocationGroup starts the given colocation group.
StartColocationGroup(entry *protos.ColocationGroup) error

// RegisterReplica registers the given weavelet replica.
RegisterReplica(entry *protos.ReplicaToRegister) error

Expand Down Expand Up @@ -117,8 +114,6 @@ func (e *EnvelopeConn) handleMessage(msg *protos.WeaveletMsg) error {
switch {
case msg.ComponentToStart != nil:
return e.send(errReply(e.handler.StartComponent(msg.ComponentToStart)))
case msg.ColocationGroupToStart != nil:
return e.send(errReply(e.handler.StartColocationGroup(msg.ColocationGroupToStart)))
case msg.ReplicaToRegister != nil:
return e.send(errReply(e.handler.RegisterReplica(msg.ReplicaToRegister)))
case msg.LoadReport != nil:
Expand Down
9 changes: 4 additions & 5 deletions internal/envelope/conn/trace_readwrite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,11 +47,10 @@ func (p *pipeForTest) RecvTraceSpans(spans []sdk.ReadOnlySpan) error {
return nil
}

func (p *pipeForTest) RecvLogEntry(*protos.LogEntry) {}
func (p *pipeForTest) StartComponent(*protos.ComponentToStart) error { return nil }
func (p *pipeForTest) RegisterReplica(*protos.ReplicaToRegister) error { return nil }
func (p *pipeForTest) StartColocationGroup(*protos.ColocationGroup) error { return nil }
func (p *pipeForTest) ReportLoad(*protos.WeaveletLoadReport) error { return nil }
func (p *pipeForTest) RecvLogEntry(*protos.LogEntry) {}
func (p *pipeForTest) StartComponent(*protos.ComponentToStart) error { return nil }
func (p *pipeForTest) RegisterReplica(*protos.ReplicaToRegister) error { return nil }
func (p *pipeForTest) ReportLoad(*protos.WeaveletLoadReport) error { return nil }
func (p *pipeForTest) GetRoutingInfo(*protos.GetRoutingInfo) (*protos.RoutingInfo, error) {
return nil, nil
}
Expand Down
7 changes: 0 additions & 7 deletions internal/envelope/conn/weavelet_conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,13 +130,6 @@ func (d *WeaveletConn) handleMessage(msg *protos.EnvelopeMsg) error {
}
}

// StartColocationGroupRPC requests the envelope to start the given
// colocation group.
func (d *WeaveletConn) StartColocationGroupRPC(group *protos.ColocationGroup) error {
_, err := d.rpc(&protos.WeaveletMsg{ColocationGroupToStart: group})
return err
}

// StartComponentRPC requests the envelope to start the given component.
func (d *WeaveletConn) StartComponentRPC(componentToStart *protos.ComponentToStart) error {
_, err := d.rpc(&protos.WeaveletMsg{ComponentToStart: componentToStart})
Expand Down
5 changes: 0 additions & 5 deletions internal/tool/multi/deploy.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,16 +108,11 @@ func deploy(ctx context.Context, args []string) error {
// Deploy main.
group := &protos.ColocationGroup{Name: "main"}
if err := b.StartComponent(&protos.ComponentToStart{
App: dep.App.Name,
DeploymentId: dep.Id,
ColocationGroup: group.Name,
Component: "main",
}); err != nil {
return fmt.Errorf("start main process: %w", err)
}
if err := b.StartColocationGroup(group); err != nil {
return fmt.Errorf("start main group: %w", err)
}

// Wait for the status server to become active.
client := status.NewClient(lis.Addr().String())
Expand Down
11 changes: 0 additions & 11 deletions internal/tool/ssh/impl/babysitter.go
Original file line number Diff line number Diff line change
Expand Up @@ -155,17 +155,6 @@ func (b *babysitter) StartComponent(req *protos.ComponentToStart) error {
})
}

// StartColocationGroup implements the protos.EnvelopeHandler interface.
func (b *babysitter) StartColocationGroup(req *protos.ColocationGroup) error {
b.logger.Debug("Starting colocation group", "group", req.Name)
return protomsg.Call(b.ctx, protomsg.CallArgs{
Client: http.DefaultClient,
Addr: b.mgrAddr,
URLPath: startColocationGroupURL,
Request: req,
})
}

// RegisterReplica implements the protos.EnvelopeHandler interface.
func (b *babysitter) RegisterReplica(replica *protos.ReplicaToRegister) error {
b.logger.Info("Replica (re)started with new address",
Expand Down
17 changes: 6 additions & 11 deletions internal/tool/ssh/impl/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,6 @@ const (
exportListenerURL = "/manager/export_listener"
startComponentURL = "/manager/start_component"
getRoutingInfoURL = "/manager/get_routing_info"
startColocationGroupURL = "/manager/start_colocation_group"
recvLogEntryURL = "/manager/recv_log_entry"
recvTraceSpansURL = "/manager/recv_trace_spans"
recvMetricsURL = "/manager/recv_metrics"
Expand Down Expand Up @@ -219,8 +218,6 @@ func (m *manager) run() error {
// Start the main process.
group := &protos.ColocationGroup{Name: "main"}
if err := m.startComponent(m.ctx, &protos.ComponentToStart{
App: m.dep.App.Name,
DeploymentId: m.dep.Id,
ColocationGroup: group.Name,
Component: "main",
}); err != nil {
Expand Down Expand Up @@ -262,7 +259,6 @@ func (m *manager) addHTTPHandlers(mux *http.ServeMux) {
mux.HandleFunc(exportListenerURL, protomsg.HandlerFunc(m.logger, m.exportListener))
mux.HandleFunc(startComponentURL, protomsg.HandlerDo(m.logger, m.startComponent))
mux.HandleFunc(getRoutingInfoURL, protomsg.HandlerFunc(m.logger, m.getRoutingInfo))
mux.HandleFunc(startColocationGroupURL, protomsg.HandlerDo(m.logger, m.startColocationGroup))
mux.HandleFunc(recvLogEntryURL, protomsg.HandlerDo(m.logger, m.handleLogEntry))
mux.HandleFunc(recvTraceSpansURL, protomsg.HandlerDo(m.logger, m.handleTraceSpans))
mux.HandleFunc(recvMetricsURL, protomsg.HandlerDo(m.logger, m.handleRecvMetrics))
Expand Down Expand Up @@ -451,7 +447,7 @@ func (m *manager) exportListener(_ context.Context, req *protos.ExportListenerRe
return &protos.ExportListenerReply{ProxyAddress: addr}, nil
}

func (m *manager) startComponent(_ context.Context, req *protos.ComponentToStart) error {
func (m *manager) startComponent(ctx context.Context, req *protos.ComponentToStart) error {
m.mu.Lock()
defer m.mu.Unlock()

Expand All @@ -468,8 +464,8 @@ func (m *manager) startComponent(_ context.Context, req *protos.ComponentToStart
if _, ok := g.Assignments[req.Component]; !ok {
// Create an initial assignment for the component.
g.Assignments[req.Component] = &protos.Assignment{
App: req.App,
DeploymentId: req.DeploymentId,
App: m.dep.App.Name,
DeploymentId: m.dep.Id,
Component: req.Component,
}
}
Expand All @@ -480,13 +476,12 @@ func (m *manager) startComponent(_ context.Context, req *protos.ComponentToStart

// Store app state
m.appState.Update(appVersionStateKey, state)
return nil

// Start the colocation group, if it hasn't started already.
return m.startColocationGroup(ctx, &protos.ColocationGroup{Name: req.ColocationGroup})
}

func (m *manager) startColocationGroup(_ context.Context, group *protos.ColocationGroup) error {
m.mu.Lock()
defer m.mu.Unlock()

// If the group is already started, ignore.
if _, found := m.started[group.Name]; found {
return nil
Expand Down
9 changes: 0 additions & 9 deletions remote.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ import (
"github.com/ServiceWeaver/weaver/internal/net/call"
"github.com/ServiceWeaver/weaver/internal/traceio"
"github.com/ServiceWeaver/weaver/runtime"
"github.com/ServiceWeaver/weaver/runtime/protomsg"
"github.com/ServiceWeaver/weaver/runtime/protos"
sdktrace "go.opentelemetry.io/otel/sdk/trace"
)
Expand Down Expand Up @@ -73,17 +72,9 @@ func (e *remoteEnv) GetWeaveletInfo() *protos.WeaveletInfo {
return e.weavelet
}

// StartColocationGroup implements the Env interface.
func (e *remoteEnv) StartColocationGroup(_ context.Context, targetGroup *protos.ColocationGroup) error {
request := protomsg.Clone(targetGroup)
return e.conn.StartColocationGroupRPC(request)
}

// RegisterComponentToStart implements the Env interface.
func (e *remoteEnv) RegisterComponentToStart(_ context.Context, targetGroup string, component string, isRouted bool) error {
request := &protos.ComponentToStart{
App: e.weavelet.App,
DeploymentId: e.weavelet.DeploymentId,
ColocationGroup: targetGroup,
Component: component,
IsRouted: isRouted,
Expand Down
3 changes: 0 additions & 3 deletions runtime/envelope/envelope.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,9 +44,6 @@ type EnvelopeHandler interface {
// StartComponent starts the given component.
StartComponent(entry *protos.ComponentToStart) error

// StartColocationGroup starts the given colocation group.
StartColocationGroup(entry *protos.ColocationGroup) error

// RegisterReplica registers the given weavelet replica.
RegisterReplica(entry *protos.ReplicaToRegister) error

Expand Down
9 changes: 4 additions & 5 deletions runtime/envelope/envelope_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -188,11 +188,10 @@ func (h *handlerForTest) getTraceSpanNames() []string {
return h.traces
}

func (h *handlerForTest) RecvLogEntry(entry *protos.LogEntry) { h.logSaver(entry) }
func (h *handlerForTest) StartComponent(*protos.ComponentToStart) error { return nil }
func (h *handlerForTest) RegisterReplica(*protos.ReplicaToRegister) error { return nil }
func (h *handlerForTest) StartColocationGroup(*protos.ColocationGroup) error { return nil }
func (h *handlerForTest) ReportLoad(*protos.WeaveletLoadReport) error { return nil }
func (h *handlerForTest) RecvLogEntry(entry *protos.LogEntry) { h.logSaver(entry) }
func (h *handlerForTest) StartComponent(*protos.ComponentToStart) error { return nil }
func (h *handlerForTest) RegisterReplica(*protos.ReplicaToRegister) error { return nil }
func (h *handlerForTest) ReportLoad(*protos.WeaveletLoadReport) error { return nil }
func (h *handlerForTest) GetRoutingInfo(*protos.GetRoutingInfo) (*protos.RoutingInfo, error) {
return nil, nil
}
Expand Down
Loading

0 comments on commit f287c0f

Please sign in to comment.