Skip to content

Commit

Permalink
Change load report from push to pull
Browse files Browse the repository at this point in the history
  • Loading branch information
rgrandl authored and mwhittaker committed Mar 22, 2023
1 parent 7e1b0b7 commit ff5ec4f
Show file tree
Hide file tree
Showing 10 changed files with 588 additions and 560 deletions.
4 changes: 2 additions & 2 deletions env.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,13 +76,13 @@ type env interface {
}

// getEnv returns the env to use for this weavelet.
func getEnv(ctx context.Context) (env, error) {
func getEnv(ctx context.Context, handler WeaveletHandler) (env, error) {
bootstrap, err := runtime.GetBootstrap(ctx)
if err != nil {
return nil, err
}
if !bootstrap.HasPipes() {
return newSingleprocessEnv(bootstrap)
}
return newRemoteEnv(ctx, bootstrap)
return newRemoteEnv(ctx, bootstrap, handler)
}
4 changes: 2 additions & 2 deletions internal/envelope/conn/conn_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ func TestMetricPropagation(t *testing.T) {
func makeConnections(t *testing.T, handler conn.EnvelopeHandler) (*conn.EnvelopeConn, *conn.WeaveletConn) {
t.Helper()

// Create the pipes. Note that We use os.Pipe instead of io.Pipe. The pipes
// Create the pipes. Note that we use os.Pipe instead of io.Pipe. The pipes
// returned by io.Pipe are synchronous, meaning that a write blocks until a
// corresponding read (or set of reads) reads the written bytes. This
// behavior is unlike the normal pipe behavior and complicates things, so
Expand Down Expand Up @@ -111,7 +111,7 @@ func makeConnections(t *testing.T, handler conn.EnvelopeHandler) (*conn.Envelope
if err != nil {
t.Fatal(err)
}
w, err := conn.NewWeaveletConn(wReader, wWriter)
w, err := conn.NewWeaveletConn(wReader, wWriter, nil /*handler*/)
if err != nil {
t.Fatal(err)
}
Expand Down
17 changes: 12 additions & 5 deletions internal/envelope/conn/envelope_conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,6 @@ type EnvelopeHandler interface {
// RegisterReplica registers the given weavelet replica.
RegisterReplica(entry *protos.ReplicaToRegister) error

// ReportLoad reports the given weavelet load information.
ReportLoad(entry *protos.WeaveletLoadReport) error

// GetAddress gets the address a weavelet should listen on for a listener.
GetAddress(req *protos.GetAddressRequest) (*protos.GetAddressReply, error)

Expand Down Expand Up @@ -116,8 +113,6 @@ func (e *EnvelopeConn) handleMessage(msg *protos.WeaveletMsg) error {
return e.send(errReply(e.handler.StartComponent(msg.ComponentToStart)))
case msg.ReplicaToRegister != nil:
return e.send(errReply(e.handler.RegisterReplica(msg.ReplicaToRegister)))
case msg.LoadReport != nil:
return e.send(errReply(e.handler.ReportLoad(msg.LoadReport)))
case msg.GetAddressRequest != nil:
reply, err := e.handler.GetAddress(msg.GetAddressRequest)
if err != nil {
Expand Down Expand Up @@ -219,6 +214,18 @@ func (e *EnvelopeConn) HealthStatusRPC() (protos.HealthStatus, error) {
return reply.HealthReport.Status, nil
}

// GetLoadInfoRPC requests the weavelet to return the latest load information.
func (e *EnvelopeConn) GetLoadInfoRPC() (*protos.WeaveletLoadReport, error) {
reply, err := e.rpc(&protos.EnvelopeMsg{SendLoadInfo: true})
if err != nil {
return nil, err
}
if reply.LoadReport == nil {
return nil, fmt.Errorf("nil load info reply received from weavelet")
}
return reply.LoadReport, nil
}

// DoProfilingRPC requests the weavelet to profile itself and return its
// profile data.
func (e *EnvelopeConn) DoProfilingRPC(req *protos.RunProfiling) (*protos.Profile, error) {
Expand Down
22 changes: 20 additions & 2 deletions internal/envelope/conn/weavelet_conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,17 @@ import (
"github.com/ServiceWeaver/weaver/runtime/protos"
)

// WeaveletHandler implements the weavelet side processing of messages exchanged
// with the corresponding envelope.
type WeaveletHandler interface {
// CollectLoad returns the latest load information at the weavelet.
CollectLoad() (*protos.WeaveletLoadReport, error)
}

// WeaveletConn is the weavelet side of the connection between a weavelet
// and its envelope. It communicates with the envelope over a pair of pipes.
type WeaveletConn struct {
handler WeaveletHandler
conn conn
wlet *protos.WeaveletInfo
metrics metrics.Exporter
Expand All @@ -41,8 +49,11 @@ type WeaveletConn struct {
//
// NewWeaveletConn blocks until it receives a protos.Weavelet from the
// envelope.
func NewWeaveletConn(r io.ReadCloser, w io.WriteCloser) (*WeaveletConn, error) {
d := &WeaveletConn{conn: conn{name: "weavelet", reader: r, writer: w}}
func NewWeaveletConn(r io.ReadCloser, w io.WriteCloser, h WeaveletHandler) (*WeaveletConn, error) {
d := &WeaveletConn{
handler: h,
conn: conn{name: "weavelet", reader: r, writer: w},
}

// Block until a weavelet is received.
msg := &protos.EnvelopeMsg{}
Expand Down Expand Up @@ -100,6 +111,13 @@ func (d *WeaveletConn) handleMessage(msg *protos.EnvelopeMsg) error {
return d.send(&protos.WeaveletMsg{Id: -msg.Id, Metrics: update})
case msg.SendHealthStatus:
return d.send(&protos.WeaveletMsg{Id: -msg.Id, HealthReport: &protos.HealthReport{Status: protos.HealthStatus_HEALTHY}})
case msg.SendLoadInfo:
id := msg.Id
load, err := d.handler.CollectLoad()
if err != nil {
return d.send(&protos.WeaveletMsg{Id: -id, Error: err.Error()})
}
return d.send(&protos.WeaveletMsg{Id: -id, LoadReport: load})
case msg.RunProfiling != nil:
// This is a blocking call, and therefore we process it in a separate
// goroutine. Note that this will cause profiling requests to be
Expand Down
4 changes: 2 additions & 2 deletions remote.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,13 +39,13 @@ type remoteEnv struct {

var _ env = &remoteEnv{}

func newRemoteEnv(ctx context.Context, bootstrap runtime.Bootstrap) (*remoteEnv, error) {
func newRemoteEnv(ctx context.Context, bootstrap runtime.Bootstrap, handler WeaveletHandler) (*remoteEnv, error) {
// Create pipe to communicate with the envelope.
toWeavelet, toEnvelope, err := bootstrap.MakePipes()
if err != nil {
return nil, err
}
conn, err := conn.NewWeaveletConn(toWeavelet, toEnvelope)
conn, err := conn.NewWeaveletConn(toWeavelet, toEnvelope, handler)
if err != nil {
return nil, fmt.Errorf("new weavelet conn: %w", err)
}
Expand Down
12 changes: 9 additions & 3 deletions runtime/envelope/envelope.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,9 +46,6 @@ type EnvelopeHandler interface {
// RegisterReplica registers the given weavelet replica.
RegisterReplica(entry *protos.ReplicaToRegister) error

// ReportLoad reports the given weavelet load information.
ReportLoad(entry *protos.WeaveletLoadReport) error

// GetAddress gets the address a weavelet should listen on for a listener.
GetAddress(req *protos.GetAddressRequest) (*protos.GetAddressReply, error)

Expand Down Expand Up @@ -338,6 +335,15 @@ func (e *Envelope) ReadMetrics() ([]*metrics.MetricSnapshot, error) {
return conn.GetMetricsRPC()
}

// GetLoadInfo returns the latest load information at the weavelet.
func (e *Envelope) GetLoadInfo() (*protos.WeaveletLoadReport, error) {
conn := e.getConn()
if conn == nil {
return nil, fmt.Errorf("cannot read load: weavelet pipe is down")
}
return conn.GetLoadInfoRPC()
}

func (e *Envelope) isStopped() bool {
e.mu.Lock()
defer e.mu.Unlock()
Expand Down
3 changes: 1 addition & 2 deletions runtime/envelope/envelope_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,6 @@ func (h *handlerForTest) getTraceSpanNames() []string {
func (h *handlerForTest) RecvLogEntry(entry *protos.LogEntry) { h.logSaver(entry) }
func (h *handlerForTest) StartComponent(*protos.ComponentToStart) error { return nil }
func (h *handlerForTest) RegisterReplica(*protos.ReplicaToRegister) error { return nil }
func (h *handlerForTest) ReportLoad(*protos.WeaveletLoadReport) error { return nil }
func (h *handlerForTest) GetRoutingInfo(*protos.GetRoutingInfo) (*protos.RoutingInfo, error) {
return nil, nil
}
Expand Down Expand Up @@ -310,7 +309,7 @@ func createWeaveletConn() (*conn.WeaveletConn, error) {
if err != nil {
return nil, fmt.Errorf("unable make weavelet<->envelope pipes: %w", err)
}
return conn.NewWeaveletConn(toWeavelet, toEnvelope)
return conn.NewWeaveletConn(toWeavelet, toEnvelope, nil /*handler*/)
}

func writeTraces(conn *conn.WeaveletConn) error {
Expand Down
Loading

0 comments on commit ff5ec4f

Please sign in to comment.