Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(stf): change router service to extract the router at runtime rather than build time #20724

Merged
merged 15 commits into from
Jun 20, 2024
Prev Previous commit
Next Next commit
fix stf
  • Loading branch information
unknown unknown committed Jun 18, 2024
commit b92eb8c307fc2fc6d7abf20911110f4d44ba356b
21 changes: 8 additions & 13 deletions runtime/v2/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,21 +96,11 @@ func (a *AppBuilder) Build(opts ...AppBuilderOption) (*App, error) {
return nil, err
}

stfMsgHandler, err := a.app.msgRouterBuilder.Build()
if err != nil {
return nil, fmt.Errorf("failed to build STF message handler: %w", err)
}

stfQueryHandler, err := a.app.queryRouterBuilder.Build()
if err != nil {
return nil, fmt.Errorf("failed to build query handler: %w", err)
}

endBlocker, valUpdate := a.app.moduleManager.EndBlock()

a.app.stf = stf.NewSTF[transaction.Tx](
stfMsgHandler,
stfQueryHandler,
stf, err := stf.NewSTF[transaction.Tx](
a.app.msgRouterBuilder,
a.app.queryRouterBuilder,
a.app.moduleManager.PreBlocker(),
a.app.moduleManager.BeginBlock(),
endBlocker,
Expand All @@ -119,6 +109,11 @@ func (a *AppBuilder) Build(opts ...AppBuilderOption) (*App, error) {
a.postTxExec,
a.branch,
)
if err != nil {
return nil, fmt.Errorf("failed to create STF: %w", err)
}

a.app.stf = stf

rs, err := rootstore.CreateRootStore(a.storeOptions)
if err != nil {
Expand Down
4 changes: 2 additions & 2 deletions runtime/v2/module.go
Original file line number Diff line number Diff line change
Expand Up @@ -211,8 +211,8 @@ func ProvideEnvironment(logger log.Logger, config *runtimev2.Module, key depinje
EventService: stf.NewEventService(),
GasService: stf.NewGasMeterService(),
HeaderService: stf.HeaderService{},
QueryRouterService: stf.NewQueryRouterService(appBuilder.app.queryRouterBuilder),
MsgRouterService: stf.NewMsgRouterService(appBuilder.app.msgRouterBuilder),
QueryRouterService: stf.NewQueryRouterService(),
MsgRouterService: stf.NewMsgRouterService([]byte(key.Name())),
TransactionService: services.NewContextAwareTransactionService(),
KVStoreService: kvService,
MemStoreService: memKvService,
Expand Down
13 changes: 6 additions & 7 deletions server/v2/stf/core_branch_service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,22 +6,17 @@ import (
"testing"

"github.com/stretchr/testify/require"
"google.golang.org/protobuf/types/known/wrapperspb"

appmodulev2 "cosmossdk.io/core/appmodule/v2"
"cosmossdk.io/core/transaction"
"cosmossdk.io/server/v2/stf/branch"
"cosmossdk.io/server/v2/stf/gas"
"cosmossdk.io/server/v2/stf/mock"
)

func TestBranchService(t *testing.T) {
s := &STF[mock.Tx]{
handleMsg: func(ctx context.Context, msg transaction.Msg) (msgResp transaction.Msg, err error) {
kvSet(t, ctx, "exec")
return nil, nil
},
handleQuery: nil,
doPreBlock: func(ctx context.Context, txs []mock.Tx) error { return nil },
doPreBlock: func(ctx context.Context, txs []mock.Tx) error { return nil },
doBeginBlock: func(ctx context.Context) error {
kvSet(t, ctx, "begin-block")
return nil
Expand All @@ -43,6 +38,10 @@ func TestBranchService(t *testing.T) {
makeGasMeter: gas.DefaultGasMeter,
makeGasMeteredState: gas.DefaultWrapWithGasMeter,
}
addMsgHandlerToSTF(t, s, func(ctx context.Context, msg *wrapperspb.BoolValue) (*wrapperspb.BoolValue, error) {
kvSet(t, ctx, "exec")
return nil, nil
})

makeContext := func() *executionContext {
state := mock.DB()
Expand Down
92 changes: 22 additions & 70 deletions server/v2/stf/core_router_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,120 +2,72 @@ package stf

import (
"context"
"errors"
"fmt"
"reflect"
"strings"

"cosmossdk.io/core/transaction"
"google.golang.org/protobuf/runtime/protoiface"

appmodulev2 "cosmossdk.io/core/appmodule/v2"
"cosmossdk.io/core/router"
)

// NewMsgRouterService implements router.Service.
func NewMsgRouterService(msgRouterBuilder *MsgRouterBuilder) router.Service {
msgRouter, err := msgRouterBuilder.Build()
if err != nil {
panic(fmt.Errorf("cannot create msgRouter: %w", err))
}

return &msgRouterService{
builder: msgRouterBuilder,
handler: msgRouter,
}
func NewMsgRouterService(identity transaction.Identity) router.Service {
return msgRouterService{identity: identity}
}

var _ router.Service = (*msgRouterService)(nil)

type msgRouterService struct {
builder *MsgRouterBuilder
handler appmodulev2.Handler
// TODO(tip): the identity sits here for the purpose of disallowing modules to impersonate others (sudo).
// right now this is not used, but it serves the reminder of something that we should be eventually
// looking into.
identity []byte
Comment on lines +13 to +23
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Refactor suggestion for msgRouterService: Consider using the identity field in future implementations.

The identity field in the msgRouterService struct is currently unused. It's a good practice to avoid leaving unused code as it can lead to confusion and maintenance issues. If there are plans to use this field in the future, consider documenting the intended use more explicitly or implementing a placeholder functionality that can be expanded later.

}

// CanInvoke returns an error if the given message cannot be invoked.
func (m *msgRouterService) CanInvoke(ctx context.Context, typeURL string) error {
if typeURL == "" {
return errors.New("missing type url")
}

typeURL = strings.TrimPrefix(typeURL, "/")
if exists := m.builder.HandlerExists(typeURL); exists {
return fmt.Errorf("unknown request: %s", typeURL)
}

return nil
func (m msgRouterService) CanInvoke(ctx context.Context, typeURL string) error {
return ctx.(*executionContext).msgRouter.CanInvoke(ctx, typeURL)
}

// InvokeTyped execute a message and fill-in a response.
// The response must be known and passed as a parameter.
// Use InvokeUntyped if the response type is not known.
func (m *msgRouterService) InvokeTyped(ctx context.Context, msg, resp protoiface.MessageV1) error {
// see https://github.com/cosmos/cosmos-sdk/pull/20349
panic("not implemented")
func (m msgRouterService) InvokeTyped(ctx context.Context, msg, resp protoiface.MessageV1) error {
return ctx.(*executionContext).msgRouter.InvokeTyped(ctx, msg, resp)
}

// InvokeUntyped execute a message and returns a response.
func (m *msgRouterService) InvokeUntyped(ctx context.Context, msg protoiface.MessageV1) (protoiface.MessageV1, error) {
return m.handler(ctx, msg)
func (m msgRouterService) InvokeUntyped(ctx context.Context, msg protoiface.MessageV1) (protoiface.MessageV1, error) {
return ctx.(*executionContext).msgRouter.InvokeUntyped(ctx, msg)
}

// NewQueryRouterService implements router.Service.
func NewQueryRouterService(queryRouterBuilder *MsgRouterBuilder) router.Service {
return &queryRouterService{
builder: queryRouterBuilder,
}
func NewQueryRouterService() router.Service {
return queryRouterService{}
}

var _ router.Service = (*queryRouterService)(nil)

type queryRouterService struct {
builder *MsgRouterBuilder
handler appmodulev2.Handler
}
type queryRouterService struct{}

// CanInvoke returns an error if the given request cannot be invoked.
func (m *queryRouterService) CanInvoke(ctx context.Context, typeURL string) error {
if typeURL == "" {
return errors.New("missing type url")
}

typeURL = strings.TrimPrefix(typeURL, "/")
if exists := m.builder.HandlerExists(typeURL); exists {
return fmt.Errorf("unknown request: %s", typeURL)
}

return nil
func (m queryRouterService) CanInvoke(ctx context.Context, typeURL string) error {
return ctx.(*executionContext).queryRouter.CanInvoke(ctx, typeURL)
}

// InvokeTyped execute a message and fill-in a response.
// The response must be known and passed as a parameter.
// Use InvokeUntyped if the response type is not known.
func (m *queryRouterService) InvokeTyped(
func (m queryRouterService) InvokeTyped(
ctx context.Context,
req, resp protoiface.MessageV1,
) error {
// TODO lazy initialization is ugly and not thread safe. we don't want to check a mutex on every InvokeTyped either.
if m.handler == nil {
var err error
m.handler, err = m.builder.Build()
if err != nil {
return fmt.Errorf("cannot create queryRouter: %w", err)
}
}
// reflection is required, see https://github.com/cosmos/cosmos-sdk/pull/20349
res, err := m.handler(ctx, req)
if err != nil {
return err
}
reflect.Indirect(reflect.ValueOf(resp)).Set(reflect.Indirect(reflect.ValueOf(res)))
return nil
return ctx.(*executionContext).queryRouter.InvokeTyped(ctx, req, resp)
}

// InvokeUntyped execute a message and returns a response.
func (m *queryRouterService) InvokeUntyped(
func (m queryRouterService) InvokeUntyped(
ctx context.Context,
req protoiface.MessageV1,
) (protoiface.MessageV1, error) {
return m.handler(ctx, req)
return ctx.(*executionContext).queryRouter.InvokeUntyped(ctx, req)
testinginprod marked this conversation as resolved.
Show resolved Hide resolved
}
51 changes: 32 additions & 19 deletions server/v2/stf/stf.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"cosmossdk.io/core/gas"
"cosmossdk.io/core/header"
"cosmossdk.io/core/log"
"cosmossdk.io/core/router"
"cosmossdk.io/core/store"
"cosmossdk.io/core/transaction"
stfgas "cosmossdk.io/server/v2/stf/gas"
Expand All @@ -23,9 +24,10 @@ var Identity = []byte("stf")

// STF is a struct that manages the state transition component of the app.
type STF[T transaction.Tx] struct {
logger log.Logger
handleMsg func(ctx context.Context, msg transaction.Msg) (transaction.Msg, error)
handleQuery func(ctx context.Context, req transaction.Msg) (transaction.Msg, error)
logger log.Logger

msgRouter Router
queryRouter Router

doPreBlock func(ctx context.Context, txs []T) error
doBeginBlock func(ctx context.Context) error
Expand All @@ -42,29 +44,40 @@ type STF[T transaction.Tx] struct {

// NewSTF returns a new STF instance.
func NewSTF[T transaction.Tx](
handleMsg func(ctx context.Context, msg transaction.Msg) (transaction.Msg, error),
handleQuery func(ctx context.Context, req transaction.Msg) (transaction.Msg, error),
msgRouterBuilder *MsgRouterBuilder,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit, can we add the logger here so it closes this as well: #20663

queryRouterBuilder *MsgRouterBuilder,
doPreBlock func(ctx context.Context, txs []T) error,
doBeginBlock func(ctx context.Context) error,
doEndBlock func(ctx context.Context) error,
doTxValidation func(ctx context.Context, tx T) error,
doValidatorUpdate func(ctx context.Context) ([]appmodulev2.ValidatorUpdate, error),
postTxExec func(ctx context.Context, tx T, success bool) error,
branch func(store store.ReaderMap) store.WriterMap,
) *STF[T] {
) (*STF[T], error) {

msgRouter, err := msgRouterBuilder.Build()
if err != nil {
return nil, fmt.Errorf("build msg router: %w", err)
}
queryRouter, err := queryRouterBuilder.Build()
if err != nil {
return nil, fmt.Errorf("build query router: %w", err)
}

return &STF[T]{
handleMsg: handleMsg,
handleQuery: handleQuery,
logger: nil,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

see above

msgRouter: msgRouter,
queryRouter: queryRouter,
doPreBlock: doPreBlock,
doBeginBlock: doBeginBlock,
doEndBlock: doEndBlock,
doTxValidation: doTxValidation,
doValidatorUpdate: doValidatorUpdate,
doTxValidation: doTxValidation,
postTxExec: postTxExec, // TODO
branchFn: branch,
makeGasMeter: stfgas.DefaultGasMeter,
makeGasMeteredState: stfgas.DefaultWrapWithGasMeter,
}
}, nil
}

// DeliverBlock is our state transition function.
Expand Down Expand Up @@ -310,7 +323,7 @@ func (s STF[T]) runTxMsgs(
execCtx.setGasLimit(gasLimit)
for i, msg := range msgs {
execCtx.sender = txSenders[i]
resp, err := s.handleMsg(execCtx, msg)
resp, err := s.msgRouter.InvokeUntyped(execCtx, msg)
if err != nil {
return nil, 0, nil, fmt.Errorf("message execution at index %d failed: %w", i, err)
}
Expand Down Expand Up @@ -346,7 +359,7 @@ func (s STF[T]) runConsensusMessages(
) ([]transaction.Msg, error) {
responses := make([]transaction.Msg, len(messages))
for i := range messages {
resp, err := s.handleMsg(ctx, messages[i])
resp, err := s.msgRouter.InvokeUntyped(ctx, messages[i])
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -498,11 +511,7 @@ func (s STF[T]) Query(
queryCtx := s.makeContext(ctx, nil, queryState, internal.ExecModeSimulate)
queryCtx.setHeaderInfo(hi)
queryCtx.setGasLimit(gasLimit)
return s.handleQuery(queryCtx, req)
}

func (s STF[T]) Message(ctx context.Context, msg transaction.Msg) (response transaction.Msg, err error) {
return s.handleMsg(ctx, msg)
return s.queryRouter.InvokeUntyped(queryCtx, req)
}

// RunWithCtx is made to support genesis, if genesis was just the execution of messages instead
Expand All @@ -521,8 +530,9 @@ func (s STF[T]) RunWithCtx(
// clone clones STF.
func (s STF[T]) clone() STF[T] {
return STF[T]{
handleMsg: s.handleMsg,
handleQuery: s.handleQuery,
logger: s.logger,
msgRouter: s.msgRouter,
queryRouter: s.queryRouter,
doPreBlock: s.doPreBlock,
doBeginBlock: s.doBeginBlock,
doEndBlock: s.doEndBlock,
Expand Down Expand Up @@ -558,6 +568,9 @@ type executionContext struct {
branchFn branchFn
makeGasMeter makeGasMeterFn
makeGasMeteredStore makeGasMeteredStateFn

msgRouter router.Service
queryRouter router.Service
}

// setHeaderInfo sets the header info in the state to be used by queries in the future.
Expand Down
Loading
Loading