Skip to content

Commit

Permalink
fix: Make ClusterWorkflowTemplate optional for namespaced Installation (
Browse files Browse the repository at this point in the history
  • Loading branch information
sarabala1979 committed Apr 19, 2020
1 parent 25c6246 commit 06c4bd6
Show file tree
Hide file tree
Showing 8 changed files with 144 additions and 21 deletions.
1 change: 1 addition & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -755,6 +755,7 @@ k8s.io/utils v0.0.0-20191218082557-f07c713de883/go.mod h1:sZAwmy6armz5eXlNoLmJcl
rsc.io/binaryregexp v0.2.0/go.mod h1:qTv7/COck+e2FymRvadv62gMdZztPaShugOCi3I+8D8=
rsc.io/quote/v3 v3.1.0/go.mod h1:yEA65RcK8LyAZtP9Kv3t0HmxON59tX3rD+tICJqUlj0=
rsc.io/sampler v1.3.0/go.mod h1:T1hPZKmBbMNahiBKFy5HrXp6adAjACjK9JXDnKaTXpA=
sigs.k8s.io/structured-merge-diff v0.0.0-20190525122527-15d366b2352e h1:4Z09Hglb792X0kfOBBJUPFEyvVfQWrYT/l8h5EKA6JQ=
sigs.k8s.io/structured-merge-diff v0.0.0-20190525122527-15d366b2352e/go.mod h1:wWxsB5ozmmv/SG7nM11ayaAW51xMvak/t1r0CSlcokI=
sigs.k8s.io/yaml v1.1.0 h1:4A07+ZFc2wgJwo8YNlQpr1rVlgUDlxXHhPJciaPY5gs=
sigs.k8s.io/yaml v1.1.0/go.mod h1:UJmg0vDUVViEyp3mgSv9WPwZCDxu4rQW1olrI1uml+o=
Expand Down
20 changes: 3 additions & 17 deletions server/auth/authorizer.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,28 +3,14 @@ package auth
import (
"context"

log "github.com/sirupsen/logrus"
authorizationv1 "k8s.io/api/authorization/v1"
authUtil "github.com/argoproj/argo/util/auth"
)

func CanI(ctx context.Context, verb, resource, namespace, name string) (bool, error) {
kubeClientset := GetKubeClient(ctx)
logCtx := log.WithFields(log.Fields{"verb": verb, "resource": resource, "namespace": namespace, "name": name})
logCtx.Debug("CanI")
review, err := kubeClientset.AuthorizationV1().SelfSubjectAccessReviews().Create(&authorizationv1.SelfSubjectAccessReview{
Spec: authorizationv1.SelfSubjectAccessReviewSpec{
ResourceAttributes: &authorizationv1.ResourceAttributes{
Namespace: namespace,
Verb: verb,
Group: "argoproj.io",
Resource: resource,
Name: name,
},
},
})
allowed, err := authUtil.CanI(kubeClientset, verb, resource, namespace, name)
if err != nil {
return false, err
}
logCtx.WithField("status", review.Status).Debug("CanI")
return review.Status.Allowed, nil
return allowed, nil
}
28 changes: 28 additions & 0 deletions util/auth/auth.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
package auth

import (
log "github.com/sirupsen/logrus"
auth "k8s.io/api/authorization/v1"
"k8s.io/client-go/kubernetes"
)

func CanI(kubeclientset kubernetes.Interface, verb, resource, namespace, name string) (bool, error) {
logCtx := log.WithFields(log.Fields{"verb": verb, "resource": resource, "namespace": namespace, "name": name})
logCtx.Debug("CanI")

review, err := kubeclientset.AuthorizationV1().SelfSubjectAccessReviews().Create(&auth.SelfSubjectAccessReview{
Spec: auth.SelfSubjectAccessReviewSpec{
ResourceAttributes: &auth.ResourceAttributes{
Namespace: namespace,
Verb: verb,
Group: "argoproj.io",
Resource: resource,
},
},
})
if err != nil {
return false, err
}
logCtx.WithField("status", review.Status).Debug("CanI")
return review.Status.Allowed, nil
}
35 changes: 35 additions & 0 deletions util/auth/auth_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
package auth

import (
"reflect"
"testing"

"github.com/stretchr/testify/assert"
authorizationv1 "k8s.io/api/authorization/v1"
"k8s.io/apimachinery/pkg/runtime"
kubefake "k8s.io/client-go/kubernetes/fake"
k8stesting "k8s.io/client-go/testing"
)

func TestCanI(t *testing.T) {
kubeClient := &kubefake.Clientset{}

kubeClient.AddReactor("create", "selfsubjectaccessreviews", func(action k8stesting.Action) (handled bool, ret runtime.Object, err error) {
selfSubjectAccessReview := reflect.ValueOf(action).FieldByName("Object").Elem().Elem().Field(2).Field(0).Elem()
resource := selfSubjectAccessReview.FieldByName("Resource").String()
verb := selfSubjectAccessReview.FieldByName("Verb").String()
allowed := resource == "workflow" && verb == "get"
return true, &authorizationv1.SelfSubjectAccessReview{
Status: authorizationv1.SubjectAccessReviewStatus{Allowed: allowed},
}, nil
})

allowed, err := CanI(kubeClient, "get", "workflow", "", "")
if assert.NoError(t, err) {
assert.True(t, allowed)
}
notAllowed, err := CanI(kubeClient, "list", "workflow", "", "")
if assert.NoError(t, err) {
assert.False(t, notAllowed)
}
}
30 changes: 27 additions & 3 deletions workflow/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,13 +28,16 @@ import (
"k8s.io/client-go/util/workqueue"
"upper.io/db.v3/lib/sqlbuilder"

"github.com/argoproj/pkg/errors"

"github.com/argoproj/argo"
"github.com/argoproj/argo/config"
"github.com/argoproj/argo/persist/sqldb"
wfv1 "github.com/argoproj/argo/pkg/apis/workflow/v1alpha1"
wfclientset "github.com/argoproj/argo/pkg/client/clientset/versioned"
wfextv "github.com/argoproj/argo/pkg/client/informers/externalversions"
wfextvv1alpha1 "github.com/argoproj/argo/pkg/client/informers/externalversions/workflow/v1alpha1"
authutil "github.com/argoproj/argo/util/auth"
"github.com/argoproj/argo/workflow/common"
"github.com/argoproj/argo/workflow/cron"
"github.com/argoproj/argo/workflow/metrics"
Expand Down Expand Up @@ -171,7 +174,6 @@ func (wfc *WorkflowController) Run(ctx context.Context, wfWorkers, podWorkers in
wfc.incompleteWfInformer = util.NewWorkflowInformer(wfc.restConfig, wfc.GetManagedNamespace(), workflowResyncPeriod, wfc.incompleteWorkflowTweakListOptions)
wfc.completedWfInformer = util.NewWorkflowInformer(wfc.restConfig, wfc.GetManagedNamespace(), workflowResyncPeriod, wfc.completedWorkflowTweakListOptions)
wfc.wftmplInformer = wfc.newWorkflowTemplateInformer()
wfc.cwftmplInformer = wfc.newClusterWorkflowTemplateInformer()

wfc.addWorkflowInformerHandler()
wfc.podInformer = wfc.newPodInformer()
Expand All @@ -180,14 +182,15 @@ func (wfc *WorkflowController) Run(ctx context.Context, wfWorkers, podWorkers in
go wfc.incompleteWfInformer.Run(ctx.Done())
go wfc.completedWfInformer.Run(ctx.Done())
go wfc.wftmplInformer.Informer().Run(ctx.Done())
go wfc.cwftmplInformer.Informer().Run(ctx.Done())
go wfc.podInformer.Run(ctx.Done())
go wfc.podLabeler(ctx.Done())
go wfc.podGarbageCollector(ctx.Done())
go wfc.periodicWorkflowGarbageCollector(ctx.Done())

wfc.createClusterWorkflowTemplateInformer(ctx)

// Wait for all involved caches to be synced, before processing items from the queue is started
for _, informer := range []cache.SharedIndexInformer{wfc.incompleteWfInformer, wfc.wftmplInformer.Informer(), wfc.cwftmplInformer.Informer(), wfc.podInformer} {
for _, informer := range []cache.SharedIndexInformer{wfc.incompleteWfInformer, wfc.wftmplInformer.Informer(), wfc.podInformer} {
if !cache.WaitForCacheSync(ctx.Done(), informer.HasSynced) {
log.Error("Timed out waiting for caches to sync")
return
Expand All @@ -203,6 +206,27 @@ func (wfc *WorkflowController) Run(ctx context.Context, wfWorkers, podWorkers in
<-ctx.Done()
}

// Check if the controller has RBAC access to ClusterWorkflowTemplates
func (wfc *WorkflowController) createClusterWorkflowTemplateInformer(ctx context.Context) {
cwftGetAllowed, err := authutil.CanI(wfc.kubeclientset, "get", "clusterworkflowtemplates", wfc.namespace, "")
errors.CheckError(err)
cwftListAllowed, err := authutil.CanI(wfc.kubeclientset, "list", "clusterworkflowtemplates", wfc.namespace, "")
errors.CheckError(err)
cwftWatchAllowed, err := authutil.CanI(wfc.kubeclientset, "watch", "clusterworkflowtemplates", wfc.namespace, "")
errors.CheckError(err)

if cwftGetAllowed && cwftListAllowed && cwftWatchAllowed {
wfc.cwftmplInformer = wfc.newClusterWorkflowTemplateInformer()
go wfc.cwftmplInformer.Informer().Run(ctx.Done())
if !cache.WaitForCacheSync(ctx.Done(), wfc.cwftmplInformer.Informer().HasSynced) {
log.Error("Timed out waiting for caches to sync")
return
}
} else {
log.Warnf("Controller doesn't have RBAC access for ClusterWorkflowTemplates")
}
}

func (wfc *WorkflowController) UpdateConfig() {
config, err := wfc.configController.Get()
if err != nil {
Expand Down
36 changes: 36 additions & 0 deletions workflow/controller/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,13 @@ import (

"github.com/prometheus/client_golang/prometheus"
"github.com/stretchr/testify/assert"
authorizationv1 "k8s.io/api/authorization/v1"
apiv1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/kubernetes/fake"
k8stesting "k8s.io/client-go/testing"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/util/workqueue"
"sigs.k8s.io/yaml"
Expand Down Expand Up @@ -120,6 +123,7 @@ func newController() (context.CancelFunc, *WorkflowController) {
wftmplInformer: wftmplInformer,
cwftmplInformer: cwftmplInformer,
wfQueue: workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter()),
podQueue: workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter()),
wfArchive: sqldb.NullWorkflowArchive,
Metrics: make(map[string]prometheus.Metric),
}
Expand Down Expand Up @@ -283,3 +287,35 @@ func TestAddingWorkflowDefaultComplexTwo(t *testing.T) {
assert.Contains(t, workflow.Labels, "label")
assert.Contains(t, workflow.Annotations, "annotation")
}

func TestNamespacedController(t *testing.T) {
kubeClient := fake.Clientset{}
allowed := false
kubeClient.AddReactor("create", "selfsubjectaccessreviews", func(action k8stesting.Action) (handled bool, ret runtime.Object, err error) {
return true, &authorizationv1.SelfSubjectAccessReview{
Status: authorizationv1.SubjectAccessReviewStatus{Allowed: allowed},
}, nil
})

_, controller := newController()
controller.kubeclientset = kubernetes.Interface(&kubeClient)
controller.cwftmplInformer = nil
controller.createClusterWorkflowTemplateInformer(context.TODO())
assert.Nil(t, controller.cwftmplInformer)
}

func TestClusterController(t *testing.T) {
kubeClient := fake.Clientset{}
allowed := true
kubeClient.AddReactor("create", "selfsubjectaccessreviews", func(action k8stesting.Action) (handled bool, ret runtime.Object, err error) {
return true, &authorizationv1.SelfSubjectAccessReview{
Status: authorizationv1.SubjectAccessReviewStatus{Allowed: allowed},
}, nil
})

_, controller := newController()
controller.kubeclientset = kubernetes.Interface(&kubeClient)
controller.cwftmplInformer = nil
controller.createClusterWorkflowTemplateInformer(context.TODO())
assert.NotNil(t, controller.cwftmplInformer)
}
8 changes: 7 additions & 1 deletion workflow/controller/operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -2390,7 +2390,13 @@ func (woc *wfOperationCtx) substituteParamsInVolumes(params map[string]string) e

// createTemplateContext creates a new template context.
func (woc *wfOperationCtx) createTemplateContext(scope wfv1.ResourceScope, resourceName string) (*templateresolution.Context, error) {
ctx := templateresolution.NewContext(woc.controller.wftmplInformer.Lister().WorkflowTemplates(woc.wf.Namespace), woc.controller.cwftmplInformer.Lister(), woc.wf, woc.wf)
var clusterWorkflowTemplateGetter templateresolution.ClusterWorkflowTemplateGetter
if woc.controller.cwftmplInformer != nil {
clusterWorkflowTemplateGetter = woc.controller.cwftmplInformer.Lister()
} else {
clusterWorkflowTemplateGetter = &templateresolution.NullClusterWorkflowTemplateGetter{}
}
ctx := templateresolution.NewContext(woc.controller.wftmplInformer.Lister().WorkflowTemplates(woc.wf.Namespace), clusterWorkflowTemplateGetter, woc.wf, woc.wf)

switch scope {
case wfv1.ResourceScopeNamespaced:
Expand Down
7 changes: 7 additions & 0 deletions workflow/templateresolution/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,13 @@ func WrapClusterWorkflowTemplateInterface(clusterClientset v1alpha1.ClusterWorkf
return &clusterWorkflowTemplateInterfaceWrapper{clientset: clusterClientset}
}

type NullClusterWorkflowTemplateGetter struct{}

func (n *NullClusterWorkflowTemplateGetter) Get(name string) (*wfv1.ClusterWorkflowTemplate, error) {
return nil, errors.Errorf("", "invalid spec: clusterworkflowtemplates.argoproj.io `%s` is "+
"forbidden: User cannot get resource 'clusterworkflowtemplates' in API group argoproj.io at the cluster scope", name)
}

// Get retrieves the WorkflowTemplate of a given name.
func (wrapper *clusterWorkflowTemplateInterfaceWrapper) Get(name string) (*wfv1.ClusterWorkflowTemplate, error) {
return wrapper.clientset.Get(name, metav1.GetOptions{})
Expand Down

0 comments on commit 06c4bd6

Please sign in to comment.