Skip to content

Commit

Permalink
Remove non transactional state apis from actors. (dapr#1977)
Browse files Browse the repository at this point in the history
* remove non trnsactional state apis from actors.

* fixing tests and removing tests for the apis which are removed.
  • Loading branch information
amanbha authored Aug 28, 2020
1 parent a1affc0 commit 16ce29a
Show file tree
Hide file tree
Showing 4 changed files with 50 additions and 541 deletions.
31 changes: 0 additions & 31 deletions pkg/actors/actors.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,6 @@ type Actors interface {
Call(ctx context.Context, req *invokev1.InvokeMethodRequest) (*invokev1.InvokeMethodResponse, error)
Init() error
GetState(ctx context.Context, req *GetStateRequest) (*StateResponse, error)
SaveState(ctx context.Context, req *SaveStateRequest) error
DeleteState(ctx context.Context, req *DeleteStateRequest) error
TransactionalStateOperation(ctx context.Context, req *TransactionalRequest) error
GetReminder(ctx context.Context, req *GetReminderRequest) (*Reminder, error)
CreateReminder(ctx context.Context, req *CreateReminderRequest) error
Expand Down Expand Up @@ -433,35 +431,6 @@ func (a *actorsRuntime) IsActorHosted(ctx context.Context, req *ActorHostedReque
return exists
}

func (a *actorsRuntime) SaveState(ctx context.Context, req *SaveStateRequest) error {
if a.store == nil {
return errors.New("actors: state store does not exist or incorrectly configured")
}
key := a.constructActorStateKey(req.ActorType, req.ActorID, req.Key)

partitionKey := a.constructCompositeKey(a.config.AppID, req.ActorType, req.ActorID)
metadata := map[string]string{metadataPartitionKey: partitionKey}

err := a.store.Set(&state.SetRequest{
Value: req.Value,
Key: key,
Metadata: metadata,
})
return err
}

func (a *actorsRuntime) DeleteState(ctx context.Context, req *DeleteStateRequest) error {
if a.store == nil {
return errors.New("actors: state store does not exist or incorrectly configured")
}
key := a.constructActorStateKey(req.ActorType, req.ActorID, req.Key)

err := a.store.Delete(&state.DeleteRequest{
Key: key,
})
return err
}

func (a *actorsRuntime) constructActorStateKey(actorType, actorID, key string) string {
return a.constructCompositeKey(a.config.AppID, actorType, actorID, key)
}
Expand Down
95 changes: 48 additions & 47 deletions pkg/actors/actors_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,16 @@ func (f *fakeStateStore) BulkSet(req []state.SetRequest) error {
}

func (f *fakeStateStore) Multi(request *state.TransactionalStateRequest) error {
for _, o := range request.Operations {
if o.Operation == state.Upsert {
req := o.Request.(state.SetRequest)
b, _ := json.Marshal(req.Value)
f.items[req.Key] = b
} else if o.Operation == state.Delete {
req := o.Request.(state.DeleteRequest)
delete(f.items, req.Key)
}
}
return nil
}

Expand Down Expand Up @@ -589,37 +599,6 @@ func TestConstructActorStateKey(t *testing.T) {
assert.Equal(t, TestKeyName, keys[3])
}

func TestSaveState(t *testing.T) {
testActorRuntime := newTestActorsRuntime()
actorType, actorID := getTestActorTypeAndID()
ctx := context.Background()
fakeData := strconv.Quote("fakeData")

var val interface{}
jsoniter.ConfigFastest.Unmarshal([]byte(fakeData), &val)

// act
fakeCallAndActivateActor(testActorRuntime, actorType, actorID)

err := testActorRuntime.SaveState(ctx, &SaveStateRequest{
ActorID: actorID,
ActorType: actorType,
Key: TestKeyName,
Value: val,
})
assert.NoError(t, err)

// assert
response, err := testActorRuntime.GetState(ctx, &GetStateRequest{
ActorID: actorID,
ActorType: actorType,
Key: TestKeyName,
})

assert.NoError(t, err)
assert.Equal(t, fakeData, string(response.Data))
}

func TestGetState(t *testing.T) {
testActorRuntime := newTestActorsRuntime()
actorType, actorID := getTestActorTypeAndID()
Expand All @@ -631,11 +610,18 @@ func TestGetState(t *testing.T) {

fakeCallAndActivateActor(testActorRuntime, actorType, actorID)

testActorRuntime.SaveState(ctx, &SaveStateRequest{
ActorID: actorID,
testActorRuntime.TransactionalStateOperation(ctx, &TransactionalRequest{
ActorType: actorType,
Key: TestKeyName,
Value: val,
ActorID: actorID,
Operations: []TransactionalOperation{
{
Operation: Upsert,
Request: TransactionalUpsert{
Key: TestKeyName,
Value: val,
},
},
},
})

// act
Expand All @@ -659,41 +645,56 @@ func TestDeleteState(t *testing.T) {
var val interface{}
jsoniter.ConfigFastest.Unmarshal([]byte(fakeData), &val)

// save test state
fakeCallAndActivateActor(testActorRuntime, actorType, actorID)

testActorRuntime.SaveState(ctx, &SaveStateRequest{
ActorID: actorID,
// insert state
testActorRuntime.TransactionalStateOperation(ctx, &TransactionalRequest{
ActorType: actorType,
Key: TestKeyName,
Value: val,
ActorID: actorID,
Operations: []TransactionalOperation{
{
Operation: Upsert,
Request: TransactionalUpsert{
Key: TestKeyName,
Value: val,
},
},
},
})

// make sure that state is stored.
// save state
response, err := testActorRuntime.GetState(ctx, &GetStateRequest{
ActorID: actorID,
ActorType: actorType,
Key: TestKeyName,
})

// make sure that state is stored.
assert.NoError(t, err)
assert.Equal(t, fakeData, string(response.Data))

// act
err = testActorRuntime.DeleteState(ctx, &DeleteStateRequest{
ActorID: actorID,
// delete state
testActorRuntime.TransactionalStateOperation(ctx, &TransactionalRequest{
ActorType: actorType,
Key: TestKeyName,
ActorID: actorID,
Operations: []TransactionalOperation{
{
Operation: Delete,
Request: TransactionalUpsert{
Key: TestKeyName,
},
},
},
})
assert.NoError(t, err)

// assert
// act
response, err = testActorRuntime.GetState(ctx, &GetStateRequest{
ActorID: actorID,
ActorType: actorType,
Key: TestKeyName,
})

// assert
assert.NoError(t, err)
assert.Nil(t, response.Data)
}
Expand Down
105 changes: 0 additions & 105 deletions pkg/http/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -208,24 +208,12 @@ func (a *api) constructActorEndpoints() []Endpoint {
Version: apiVersionV1,
Handler: a.onDirectActorMessage,
},
{
Methods: []string{fasthttp.MethodPost, fasthttp.MethodPut},
Route: "actors/{actorType}/{actorId}/state/{key}",
Version: apiVersionV1,
Handler: a.onSaveActorState,
},
{
Methods: []string{fasthttp.MethodGet},
Route: "actors/{actorType}/{actorId}/state/{key}",
Version: apiVersionV1,
Handler: a.onGetActorState,
},
{
Methods: []string{fasthttp.MethodDelete},
Route: "actors/{actorType}/{actorId}/state/{key}",
Version: apiVersionV1,
Handler: a.onDeleteActorState,
},
{
Methods: []string{fasthttp.MethodPost, fasthttp.MethodPut},
Route: "actors/{actorType}/{actorId}/reminders/{name}",
Expand Down Expand Up @@ -844,59 +832,6 @@ func (a *api) onDirectActorMessage(reqCtx *fasthttp.RequestCtx) {
respond(reqCtx, statusCode, body)
}

func (a *api) onSaveActorState(reqCtx *fasthttp.RequestCtx) {
if a.actor == nil {
msg := NewErrorResponse("ERR_ACTOR_RUNTIME_NOT_FOUND", "")
respondWithError(reqCtx, 400, msg)
log.Debug(msg)
return
}

actorType := reqCtx.UserValue(actorTypeParam).(string)
actorID := reqCtx.UserValue(actorIDParam).(string)
key := reqCtx.UserValue(stateKeyParam).(string)
body := reqCtx.PostBody()

hosted := a.actor.IsActorHosted(reqCtx, &actors.ActorHostedRequest{
ActorType: actorType,
ActorID: actorID,
})

if !hosted {
msg := NewErrorResponse("ERR_ACTOR_INSTANCE_MISSING", "")
respondWithError(reqCtx, 400, msg)
log.Debug(msg)
return
}

// Deserialize body to validate JSON compatible body
// and remove useless characters before saving
var val interface{}
err := a.json.Unmarshal(body, &val)
if err != nil {
msg := NewErrorResponse("ERR_DESERIALIZE_HTTP_BODY", err.Error())
respondWithError(reqCtx, 400, msg)
log.Debug(msg)
return
}

req := actors.SaveStateRequest{
ActorID: actorID,
ActorType: actorType,
Key: key,
Value: val,
}

err = a.actor.SaveState(reqCtx, &req)
if err != nil {
msg := NewErrorResponse("ERR_ACTOR_STATE_SAVE", err.Error())
respondWithError(reqCtx, 500, msg)
log.Debug(msg)
} else {
respondEmpty(reqCtx, 201)
}
}

func (a *api) onGetActorState(reqCtx *fasthttp.RequestCtx) {
if a.actor == nil {
msg := NewErrorResponse("ERR_ACTOR_RUNTIME_NOT_FOUND", "")
Expand Down Expand Up @@ -925,46 +860,6 @@ func (a *api) onGetActorState(reqCtx *fasthttp.RequestCtx) {
}
}

func (a *api) onDeleteActorState(reqCtx *fasthttp.RequestCtx) {
if a.actor == nil {
msg := NewErrorResponse("ERR_ACTOR_RUNTIME_NOT_FOUND", "")
respondWithError(reqCtx, 400, msg)
log.Debug(msg)
return
}

actorType := reqCtx.UserValue(actorTypeParam).(string)
actorID := reqCtx.UserValue(actorIDParam).(string)
key := reqCtx.UserValue(stateKeyParam).(string)

hosted := a.actor.IsActorHosted(reqCtx, &actors.ActorHostedRequest{
ActorType: actorType,
ActorID: actorID,
})

if !hosted {
msg := NewErrorResponse("ERR_ACTOR_INSTANCE_MISSING", "")
respondWithError(reqCtx, 400, msg)
log.Debug(msg)
return
}

req := actors.DeleteStateRequest{
ActorID: actorID,
ActorType: actorType,
Key: key,
}

err := a.actor.DeleteState(reqCtx, &req)
if err != nil {
msg := NewErrorResponse("ERR_ACTOR_STATE_DELETE", err.Error())
respondWithError(reqCtx, 500, msg)
log.Debug(msg)
} else {
respondEmpty(reqCtx, 200)
}
}

func (a *api) onGetMetadata(reqCtx *fasthttp.RequestCtx) {
temp := make(map[interface{}]interface{})

Expand Down
Loading

0 comments on commit 16ce29a

Please sign in to comment.