Skip to content

Commit

Permalink
Socket disconnect runtime function. (heroiclabs#292)
Browse files Browse the repository at this point in the history
  • Loading branch information
zyro committed Jan 30, 2019
1 parent 273d33f commit ce9efe2
Show file tree
Hide file tree
Showing 18 changed files with 517 additions and 68 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ The format is based on [keep a changelog](http:https://keepachangelog.com) and this pr
## [Unreleased]
### Added
- Additional logging format option for Stackdriver Logging.
- New runtime function to immediately disconnect active sockets.
- New runtime function to kick arbitrary presences from streams.

### Fixed
- Correctly return group user results in Lua runtime listing operation.
Expand Down
5 changes: 3 additions & 2 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ func main() {

// Start up server components.
matchmaker := server.NewLocalMatchmaker(startupLogger, config.GetName())
sessionRegistry := server.NewSessionRegistry()
sessionRegistry := server.NewLocalSessionRegistry()
tracker := server.StartLocalTracker(logger, config, sessionRegistry, jsonpbMarshaler)
router := server.NewLocalMessageRouter(sessionRegistry, tracker, jsonpbMarshaler)
leaderboardCache := server.NewLocalLeaderboardCache(logger, startupLogger, db)
Expand All @@ -107,7 +107,8 @@ func main() {
matchRegistry := server.NewLocalMatchRegistry(logger, startupLogger, config, tracker, router, config.GetName())
tracker.SetMatchJoinListener(matchRegistry.Join)
tracker.SetMatchLeaveListener(matchRegistry.Leave)
runtime, err := server.NewRuntime(logger, startupLogger, db, jsonpbMarshaler, jsonpbUnmarshaler, config, socialClient, leaderboardCache, leaderboardRankCache, leaderboardScheduler, sessionRegistry, matchRegistry, tracker, router)
streamManager := server.NewLocalStreamManager(config, sessionRegistry, tracker)
runtime, err := server.NewRuntime(logger, startupLogger, db, jsonpbMarshaler, jsonpbUnmarshaler, config, socialClient, leaderboardCache, leaderboardRankCache, leaderboardScheduler, sessionRegistry, matchRegistry, tracker, streamManager, router)
if err != nil {
startupLogger.Fatal("Failed initializing runtime modules", zap.Error(err))
}
Expand Down
7 changes: 5 additions & 2 deletions runtime/runtime.go
Original file line number Diff line number Diff line change
Expand Up @@ -757,10 +757,13 @@ type NakamaModule interface {
StreamUserJoin(mode uint8, subject, subcontext, label, userID, sessionID string, hidden, persistence bool, status string) (bool, error)
StreamUserUpdate(mode uint8, subject, subcontext, label, userID, sessionID string, hidden, persistence bool, status string) error
StreamUserLeave(mode uint8, subject, subcontext, label, userID, sessionID string) error
StreamUserKick(mode uint8, subject, subcontext, label string, presence Presence) error
StreamCount(mode uint8, subject, subcontext, label string) (int, error)
StreamClose(mode uint8, subject, subcontext, label string) error
StreamSend(mode uint8, subject, subcontext, label, data string) error
StreamSendRaw(mode uint8, subject, subcontext, label string, msg *rtapi.Envelope) error
StreamSend(mode uint8, subject, subcontext, label, data string, presences []Presence) error
StreamSendRaw(mode uint8, subject, subcontext, label string, msg *rtapi.Envelope, presences []Presence) error

SessionDisconnect(ctx context.Context, sessionID, node string) error

MatchCreate(ctx context.Context, module string, params map[string]interface{}) (string, error)
MatchList(ctx context.Context, limit int, authoritative bool, label string, minSize, maxSize int, query string) ([]*api.Match, error)
Expand Down
2 changes: 1 addition & 1 deletion server/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ type ApiServer struct {
grpcGatewayServer *http.Server
}

func StartApiServer(logger *zap.Logger, startupLogger *zap.Logger, db *sql.DB, jsonpbMarshaler *jsonpb.Marshaler, jsonpbUnmarshaler *jsonpb.Unmarshaler, config Config, socialClient *social.Client, leaderboardCache LeaderboardCache, leaderboardRankCache LeaderboardRankCache, sessionRegistry *SessionRegistry, matchRegistry MatchRegistry, matchmaker Matchmaker, tracker Tracker, router MessageRouter, pipeline *Pipeline, runtime *Runtime) *ApiServer {
func StartApiServer(logger *zap.Logger, startupLogger *zap.Logger, db *sql.DB, jsonpbMarshaler *jsonpb.Marshaler, jsonpbUnmarshaler *jsonpb.Unmarshaler, config Config, socialClient *social.Client, leaderboardCache LeaderboardCache, leaderboardRankCache LeaderboardRankCache, sessionRegistry SessionRegistry, matchRegistry MatchRegistry, matchmaker Matchmaker, tracker Tracker, router MessageRouter, pipeline *Pipeline, runtime *Runtime) *ApiServer {
if config.GetSocket().IdleTimeoutMs > 500 {
// Ensure the GRPC Gateway timeout is just under the idle timeout (if possible) to ensure it has priority.
grpcRuntime.DefaultContextTimeout = time.Duration(config.GetSocket().IdleTimeoutMs-500) * time.Millisecond
Expand Down
4 changes: 2 additions & 2 deletions server/message_router.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,11 +38,11 @@ type MessageRouter interface {

type LocalMessageRouter struct {
jsonpbMarshaler *jsonpb.Marshaler
sessionRegistry *SessionRegistry
sessionRegistry SessionRegistry
tracker Tracker
}

func NewLocalMessageRouter(sessionRegistry *SessionRegistry, tracker Tracker, jsonpbMarshaler *jsonpb.Marshaler) MessageRouter {
func NewLocalMessageRouter(sessionRegistry SessionRegistry, tracker Tracker, jsonpbMarshaler *jsonpb.Marshaler) MessageRouter {
return &LocalMessageRouter{
jsonpbMarshaler: jsonpbMarshaler,
sessionRegistry: sessionRegistry,
Expand Down
4 changes: 2 additions & 2 deletions server/pipeline.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ type Pipeline struct {
db *sql.DB
jsonpbMarshaler *jsonpb.Marshaler
jsonpbUnmarshaler *jsonpb.Unmarshaler
sessionRegistry *SessionRegistry
sessionRegistry SessionRegistry
matchRegistry MatchRegistry
matchmaker Matchmaker
tracker Tracker
Expand All @@ -39,7 +39,7 @@ type Pipeline struct {
node string
}

func NewPipeline(logger *zap.Logger, config Config, db *sql.DB, jsonpbMarshaler *jsonpb.Marshaler, jsonpbUnmarshaler *jsonpb.Unmarshaler, sessionRegistry *SessionRegistry, matchRegistry MatchRegistry, matchmaker Matchmaker, tracker Tracker, router MessageRouter, runtime *Runtime) *Pipeline {
func NewPipeline(logger *zap.Logger, config Config, db *sql.DB, jsonpbMarshaler *jsonpb.Marshaler, jsonpbUnmarshaler *jsonpb.Unmarshaler, sessionRegistry SessionRegistry, matchRegistry MatchRegistry, matchmaker Matchmaker, tracker Tracker, router MessageRouter, runtime *Runtime) *Pipeline {
return &Pipeline{
logger: logger,
config: config,
Expand Down
6 changes: 3 additions & 3 deletions server/runtime.go
Original file line number Diff line number Diff line change
Expand Up @@ -364,7 +364,7 @@ type Runtime struct {
leaderboardResetFunction RuntimeLeaderboardResetFunction
}

func NewRuntime(logger, startupLogger *zap.Logger, db *sql.DB, jsonpbMarshaler *jsonpb.Marshaler, jsonpbUnmarshaler *jsonpb.Unmarshaler, config Config, socialClient *social.Client, leaderboardCache LeaderboardCache, leaderboardRankCache LeaderboardRankCache, leaderboardScheduler LeaderboardScheduler, sessionRegistry *SessionRegistry, matchRegistry MatchRegistry, tracker Tracker, router MessageRouter) (*Runtime, error) {
func NewRuntime(logger, startupLogger *zap.Logger, db *sql.DB, jsonpbMarshaler *jsonpb.Marshaler, jsonpbUnmarshaler *jsonpb.Unmarshaler, config Config, socialClient *social.Client, leaderboardCache LeaderboardCache, leaderboardRankCache LeaderboardRankCache, leaderboardScheduler LeaderboardScheduler, sessionRegistry SessionRegistry, matchRegistry MatchRegistry, tracker Tracker, streamManager StreamManager, router MessageRouter) (*Runtime, error) {
runtimeConfig := config.GetRuntime()
startupLogger.Info("Initialising runtime", zap.String("path", runtimeConfig.Path))

Expand All @@ -389,13 +389,13 @@ func NewRuntime(logger, startupLogger *zap.Logger, db *sql.DB, jsonpbMarshaler *
return nil, err
}

goModules, goRpcFunctions, goBeforeRtFunctions, goAfterRtFunctions, goBeforeReqFunctions, goAfterReqFunctions, goMatchmakerMatchedFunction, goMatchCreateFn, goTournamentEndFunction, goTournamentResetFunction, goLeaderboardResetFunction, goSetMatchCreateFn, goMatchNamesListFn, err := NewRuntimeProviderGo(logger, startupLogger, db, config, socialClient, leaderboardCache, leaderboardRankCache, leaderboardScheduler, sessionRegistry, matchRegistry, tracker, router, runtimeConfig.Path, paths)
goModules, goRpcFunctions, goBeforeRtFunctions, goAfterRtFunctions, goBeforeReqFunctions, goAfterReqFunctions, goMatchmakerMatchedFunction, goMatchCreateFn, goTournamentEndFunction, goTournamentResetFunction, goLeaderboardResetFunction, goSetMatchCreateFn, goMatchNamesListFn, err := NewRuntimeProviderGo(logger, startupLogger, db, config, socialClient, leaderboardCache, leaderboardRankCache, leaderboardScheduler, sessionRegistry, matchRegistry, tracker, streamManager, router, runtimeConfig.Path, paths)
if err != nil {
startupLogger.Error("Error initialising Go runtime provider", zap.Error(err))
return nil, err
}

luaModules, luaRpcFunctions, luaBeforeRtFunctions, luaAfterRtFunctions, luaBeforeReqFunctions, luaAfterReqFunctions, luaMatchmakerMatchedFunction, allMatchCreateFn, luaTournamentEndFunction, luaTournamentResetFunction, luaLeaderboardResetFunction, err := NewRuntimeProviderLua(logger, startupLogger, db, jsonpbMarshaler, jsonpbUnmarshaler, config, socialClient, leaderboardCache, leaderboardRankCache, leaderboardScheduler, sessionRegistry, matchRegistry, tracker, router, goMatchCreateFn, runtimeConfig.Path, paths)
luaModules, luaRpcFunctions, luaBeforeRtFunctions, luaAfterRtFunctions, luaBeforeReqFunctions, luaAfterReqFunctions, luaMatchmakerMatchedFunction, allMatchCreateFn, luaTournamentEndFunction, luaTournamentResetFunction, luaLeaderboardResetFunction, err := NewRuntimeProviderLua(logger, startupLogger, db, jsonpbMarshaler, jsonpbUnmarshaler, config, socialClient, leaderboardCache, leaderboardRankCache, leaderboardScheduler, sessionRegistry, matchRegistry, tracker, streamManager, router, goMatchCreateFn, runtimeConfig.Path, paths)
if err != nil {
startupLogger.Error("Error initialising Lua runtime provider", zap.Error(err))
return nil, err
Expand Down
4 changes: 2 additions & 2 deletions server/runtime_go.go
Original file line number Diff line number Diff line change
Expand Up @@ -1736,10 +1736,10 @@ func (ri *RuntimeGoInitializer) RegisterMatch(name string, fn func(ctx context.C
return nil
}

func NewRuntimeProviderGo(logger, startupLogger *zap.Logger, db *sql.DB, config Config, socialClient *social.Client, leaderboardCache LeaderboardCache, leaderboardRankCache LeaderboardRankCache, leaderboardScheduler LeaderboardScheduler, sessionRegistry *SessionRegistry, matchRegistry MatchRegistry, tracker Tracker, router MessageRouter, rootPath string, paths []string) ([]string, map[string]RuntimeRpcFunction, map[string]RuntimeBeforeRtFunction, map[string]RuntimeAfterRtFunction, *RuntimeBeforeReqFunctions, *RuntimeAfterReqFunctions, RuntimeMatchmakerMatchedFunction, RuntimeMatchCreateFunction, RuntimeTournamentEndFunction, RuntimeTournamentResetFunction, RuntimeLeaderboardResetFunction, func(RuntimeMatchCreateFunction), func() []string, error) {
func NewRuntimeProviderGo(logger, startupLogger *zap.Logger, db *sql.DB, config Config, socialClient *social.Client, leaderboardCache LeaderboardCache, leaderboardRankCache LeaderboardRankCache, leaderboardScheduler LeaderboardScheduler, sessionRegistry SessionRegistry, matchRegistry MatchRegistry, tracker Tracker, streamManager StreamManager, router MessageRouter, rootPath string, paths []string) ([]string, map[string]RuntimeRpcFunction, map[string]RuntimeBeforeRtFunction, map[string]RuntimeAfterRtFunction, *RuntimeBeforeReqFunctions, *RuntimeAfterReqFunctions, RuntimeMatchmakerMatchedFunction, RuntimeMatchCreateFunction, RuntimeTournamentEndFunction, RuntimeTournamentResetFunction, RuntimeLeaderboardResetFunction, func(RuntimeMatchCreateFunction), func() []string, error) {
runtimeLogger := NewRuntimeGoLogger(logger)
env := config.GetRuntime().Environment
nk := NewRuntimeGoNakamaModule(logger, db, config, socialClient, leaderboardCache, leaderboardRankCache, leaderboardScheduler, sessionRegistry, matchRegistry, tracker, router)
nk := NewRuntimeGoNakamaModule(logger, db, config, socialClient, leaderboardCache, leaderboardRankCache, leaderboardScheduler, sessionRegistry, matchRegistry, tracker, streamManager, router)

match := make(map[string]func(ctx context.Context, logger runtime.Logger, db *sql.DB, nk runtime.NakamaModule) (runtime.Match, error), 0)
matchLock := &sync.RWMutex{}
Expand Down
1 change: 0 additions & 1 deletion server/runtime_go_match_core.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,6 @@ type RuntimeGoMatchCore struct {
ctxCancelFn context.CancelFunc
}

//func NewRuntimeGoMatchCore(logger *zap.Logger, matchRegistry MatchRegistry, tracker Tracker, router MessageRouter, id uuid.UUID, node string, labelUpdateFn RuntimeMatchLabelUpdateFunction, db *sql.DB, env map[string]string, nk runtime.NakamaModule, match runtime.Match) (RuntimeMatchCore, error) {
func NewRuntimeGoMatchCore(logger *zap.Logger, matchRegistry MatchRegistry, router MessageRouter, id uuid.UUID, node string, db *sql.DB, env map[string]string, nk runtime.NakamaModule, match runtime.Match) (RuntimeMatchCore, error) {
ctx, ctxCancelFn := context.WithCancel(context.Background())
ctx = NewRuntimeGoContext(ctx, env, RuntimeExecutionModeMatch, nil, 0, "", "", "", "", "")
Expand Down
116 changes: 110 additions & 6 deletions server/runtime_go_nakama.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,17 +46,18 @@ type RuntimeGoNakamaModule struct {
leaderboardCache LeaderboardCache
leaderboardRankCache LeaderboardRankCache
leaderboardScheduler LeaderboardScheduler
sessionRegistry *SessionRegistry
sessionRegistry SessionRegistry
matchRegistry MatchRegistry
tracker Tracker
streamManager StreamManager
router MessageRouter

node string

matchCreateFn RuntimeMatchCreateFunction
}

func NewRuntimeGoNakamaModule(logger *zap.Logger, db *sql.DB, config Config, socialClient *social.Client, leaderboardCache LeaderboardCache, leaderboardRankCache LeaderboardRankCache, leaderboardScheduler LeaderboardScheduler, sessionRegistry *SessionRegistry, matchRegistry MatchRegistry, tracker Tracker, router MessageRouter) *RuntimeGoNakamaModule {
func NewRuntimeGoNakamaModule(logger *zap.Logger, db *sql.DB, config Config, socialClient *social.Client, leaderboardCache LeaderboardCache, leaderboardRankCache LeaderboardRankCache, leaderboardScheduler LeaderboardScheduler, sessionRegistry SessionRegistry, matchRegistry MatchRegistry, tracker Tracker, streamManager StreamManager, router MessageRouter) *RuntimeGoNakamaModule {
return &RuntimeGoNakamaModule{
logger: logger,
db: db,
Expand All @@ -68,6 +69,7 @@ func NewRuntimeGoNakamaModule(logger *zap.Logger, db *sql.DB, config Config, soc
sessionRegistry: sessionRegistry,
matchRegistry: matchRegistry,
tracker: tracker,
streamManager: streamManager,
router: router,

node: config.GetName(),
Expand Down Expand Up @@ -581,6 +583,42 @@ func (n *RuntimeGoNakamaModule) StreamUserLeave(mode uint8, subject, subcontext,
return nil
}

func (n *RuntimeGoNakamaModule) StreamUserKick(mode uint8, subject, subcontext, label string, presence runtime.Presence) error {
uid, err := uuid.FromString(presence.GetUserId())
if err != nil {
return errors.New("expects valid user id")
}

sid, err := uuid.FromString(presence.GetSessionId())
if err != nil {
return errors.New("expects valid session id")
}

node := presence.GetNodeId()
if node == "" {
node = n.node
}

stream := PresenceStream{
Mode: mode,
Label: label,
}
if subject != "" {
stream.Subject, err = uuid.FromString(subject)
if err != nil {
return errors.New("stream subject must be a valid identifier")
}
}
if subcontext != "" {
stream.Subcontext, err = uuid.FromString(subcontext)
if err != nil {
return errors.New("stream subcontext must be a valid identifier")
}
}

return n.streamManager.UserKick(uid, sid, node, stream)
}

func (n *RuntimeGoNakamaModule) StreamCount(mode uint8, subject, subcontext, label string) (int, error) {
stream := PresenceStream{
Mode: mode,
Expand Down Expand Up @@ -627,7 +665,7 @@ func (n *RuntimeGoNakamaModule) StreamClose(mode uint8, subject, subcontext, lab
return nil
}

func (n *RuntimeGoNakamaModule) StreamSend(mode uint8, subject, subcontext, label, data string) error {
func (n *RuntimeGoNakamaModule) StreamSend(mode uint8, subject, subcontext, label, data string, presences []runtime.Presence) error {
stream := PresenceStream{
Mode: mode,
Label: label,
Expand All @@ -646,6 +684,26 @@ func (n *RuntimeGoNakamaModule) StreamSend(mode uint8, subject, subcontext, labe
}
}

var presenceIDs []*PresenceID
if l := len(presences); l != 0 {
presenceIDs = make([]*PresenceID, 0, l)
for _, presence := range presences {
sessionID, err := uuid.FromString(presence.GetSessionId())
if err != nil {
return errors.New("expects each presence session id to be a valid identifier")
}
node := presence.GetNodeId()
if node == "" {
node = n.node
}

presenceIDs = append(presenceIDs, &PresenceID{
SessionID: sessionID,
Node: node,
})
}
}

streamWire := &rtapi.Stream{
Mode: int32(stream.Mode),
Label: stream.Label,
Expand All @@ -661,12 +719,19 @@ func (n *RuntimeGoNakamaModule) StreamSend(mode uint8, subject, subcontext, labe
// No sender.
Data: data,
}}}
n.router.SendToStream(n.logger, stream, msg)

if len(presenceIDs) == 0 {
// Sending to whole stream.
n.router.SendToStream(n.logger, stream, msg)
} else {
// Sending to a subset of stream users.
n.router.SendToPresenceIDs(n.logger, presenceIDs, true, stream.Mode, msg)
}

return nil
}

func (n *RuntimeGoNakamaModule) StreamSendRaw(mode uint8, subject, subcontext, label string, msg *rtapi.Envelope) error {
func (n *RuntimeGoNakamaModule) StreamSendRaw(mode uint8, subject, subcontext, label string, msg *rtapi.Envelope, presences []runtime.Presence) error {
stream := PresenceStream{
Mode: mode,
Label: label,
Expand All @@ -688,11 +753,50 @@ func (n *RuntimeGoNakamaModule) StreamSendRaw(mode uint8, subject, subcontext, l
return errors.New("expects a valid message")
}

n.router.SendToStream(n.logger, stream, msg)
var presenceIDs []*PresenceID
if l := len(presences); l != 0 {
presenceIDs = make([]*PresenceID, 0, l)
for _, presence := range presences {
sessionID, err := uuid.FromString(presence.GetSessionId())
if err != nil {
return errors.New("expects each presence session id to be a valid identifier")
}
node := presence.GetNodeId()
if node == "" {
node = n.node
}

presenceIDs = append(presenceIDs, &PresenceID{
SessionID: sessionID,
Node: node,
})
}
}

if len(presenceIDs) == 0 {
// Sending to whole stream.
n.router.SendToStream(n.logger, stream, msg)
} else {
// Sending to a subset of stream users.
n.router.SendToPresenceIDs(n.logger, presenceIDs, true, stream.Mode, msg)
}

return nil
}

func (n *RuntimeGoNakamaModule) SessionDisconnect(ctx context.Context, sessionID, node string) error {
sid, err := uuid.FromString(sessionID)
if err != nil {
return errors.New("expects valid session id")
}

if node == "" {
node = n.node
}

return n.sessionRegistry.Disconnect(ctx, sid, node)
}

func (n *RuntimeGoNakamaModule) MatchCreate(ctx context.Context, module string, params map[string]interface{}) (string, error) {
if module == "" {
return "", errors.New("expects module name")
Expand Down
Loading

0 comments on commit ce9efe2

Please sign in to comment.