forked from argoproj/argo-workflows
-
Notifications
You must be signed in to change notification settings - Fork 0
/
event_server_test.go
64 lines (49 loc) · 2.1 KB
/
event_server_test.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
package event
import (
"encoding/json"
"testing"
"github.com/stretchr/testify/assert"
"golang.org/x/net/context"
fakekube "k8s.io/client-go/kubernetes/fake"
eventpkg "github.com/argoproj/argo-workflows/v3/pkg/apiclient/event"
wfv1 "github.com/argoproj/argo-workflows/v3/pkg/apis/workflow/v1alpha1"
"github.com/argoproj/argo-workflows/v3/pkg/client/clientset/versioned/fake"
"github.com/argoproj/argo-workflows/v3/server/auth"
"github.com/argoproj/argo-workflows/v3/util/instanceid"
"github.com/argoproj/argo-workflows/v3/workflow/events"
)
func TestController(t *testing.T) {
clientset := fake.NewSimpleClientset()
ctx := context.WithValue(context.TODO(), auth.WfKey, clientset)
instanceIDService := instanceid.NewService("my-instanceid")
eventRecorderManager := events.NewEventRecorderManager(fakekube.NewSimpleClientset())
newController := func(asyncDispatch bool) *Controller {
return NewController(instanceIDService, eventRecorderManager, 1, 1, asyncDispatch)
}
e1 := &eventpkg.EventRequest{Namespace: "my-ns", Payload: &wfv1.Item{}}
e2 := &eventpkg.EventRequest{}
t.Run("Async", func(t *testing.T) {
s := newController(true)
_, err := s.ReceiveEvent(ctx, e1)
assert.NoError(t, err)
assert.Len(t, s.operationQueue, 1, "one event to be processed")
_, err = s.ReceiveEvent(ctx, e2)
assert.EqualError(t, err, "operation queue full", "backpressure when queue is full")
stopCh := make(chan struct{}, 1)
stopCh <- struct{}{}
s.Run(stopCh)
assert.Len(t, s.operationQueue, 0, "all events were processed")
})
t.Run("Sync", func(t *testing.T) {
s := newController(false)
_, err := s.ReceiveEvent(ctx, e1)
assert.NoError(t, err)
_, err = s.ReceiveEvent(ctx, e2)
assert.NoError(t, err)
})
t.Run("SyncError", func(t *testing.T) {
s := newController(false)
_, err := s.ReceiveEvent(ctx, &eventpkg.EventRequest{Namespace: "my-ns", Payload: &wfv1.Item{Value: json.RawMessage("!")}})
assert.EqualError(t, err, "failed to create workflow template expression environment: json: error calling MarshalJSON for type *v1alpha1.Item: invalid character '!' looking for beginning of value")
})
}