Skip to content

Commit

Permalink
Add validation of workload entry identity (#119)
Browse files Browse the repository at this point in the history
(cherry picked from commit 0658207e10cef9ac564b31567a05750624a54941)
  • Loading branch information
howardjohn committed Jul 19, 2023
1 parent 323ebec commit 0013609
Show file tree
Hide file tree
Showing 6 changed files with 140 additions and 11 deletions.
25 changes: 25 additions & 0 deletions pilot/pkg/autoregistration/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -228,9 +228,31 @@ func (c *Controller) RegisterWorkload(proxy *model.Proxy, conTime time.Time) err
return nil
}

// ensureProxyCanControlEntry ensures the connected proxy's identity matches that of the WorkloadEntry it is associating with.
func ensureProxyCanControlEntry(proxy *model.Proxy, wle *config.Config) error {
if !features.ValidateWorkloadEntryIdentity {
// Validation disabled, skip
return nil
}
if proxy.VerifiedIdentity == nil {
return fmt.Errorf("registration of WorkloadEntry requires a verified identity")
}
if proxy.VerifiedIdentity.Namespace != wle.Namespace {
return fmt.Errorf("registration of WorkloadEntry namespace mismatch: %q vs %q", proxy.VerifiedIdentity.Namespace, wle.Namespace)
}
spec := wle.Spec.(*v1alpha3.WorkloadEntry)
if spec.ServiceAccount != "" && proxy.VerifiedIdentity.ServiceAccount != spec.ServiceAccount {
return fmt.Errorf("registration of WorkloadEntry service account mismatch: %q vs %q", proxy.VerifiedIdentity.ServiceAccount, spec.ServiceAccount)
}
return nil
}

func (c *Controller) registerWorkload(entryName string, proxy *model.Proxy, conTime time.Time) error {
wle := c.store.Get(gvk.WorkloadEntry, entryName, proxy.Metadata.Namespace)
if wle != nil {
if err := ensureProxyCanControlEntry(proxy, wle); err != nil {
return err
}
lastConTime, _ := time.Parse(timeFormat, wle.Annotations[ConnectedAtAnnotation])
// the proxy has reconnected to another pilot, not belong to this one.
if conTime.Before(lastConTime) {
Expand All @@ -257,6 +279,9 @@ func (c *Controller) registerWorkload(entryName string, proxy *model.Proxy, conT
proxy.ID, proxy.Metadata.Namespace, proxy.Metadata.AutoRegisterGroup)
}
entry := workloadEntryFromGroup(entryName, proxy, groupCfg)
if err := ensureProxyCanControlEntry(proxy, entry); err != nil {
return err
}
setConnectMeta(entry, c.instanceID, conTime)
_, err := c.store.Create(*entry)
if err != nil {
Expand Down
106 changes: 98 additions & 8 deletions pilot/pkg/autoregistration/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ import (
"istio.io/istio/pkg/config/schema/gvk"
"istio.io/istio/pkg/keepalive"
"istio.io/istio/pkg/network"
"istio.io/istio/pkg/spiffe"
"istio.io/istio/pkg/test"
"istio.io/istio/pkg/test/util/assert"
"istio.io/istio/pkg/test/util/retry"
Expand Down Expand Up @@ -71,6 +72,39 @@ var (
Spec: tmplA,
Status: nil,
}
wgAWrongNs = config.Config{
Meta: config.Meta{
GroupVersionKind: gvk.WorkloadGroup,
Namespace: "wrong",
Name: "wg-a",
Labels: map[string]string{
"grouplabel": "notonentry",
},
},
Spec: tmplA,
Status: nil,
}
wgWithoutSA = config.Config{
Meta: config.Meta{
GroupVersionKind: gvk.WorkloadGroup,
Namespace: "a",
Name: "wg-b",
Labels: map[string]string{
"grouplabel": "notonentry",
},
},
Spec: &v1alpha3.WorkloadGroup{
Template: &v1alpha3.WorkloadEntry{
Ports: map[string]uint32{"http": 80},
Labels: map[string]string{"app": "a"},
Network: "nw0",
Locality: "reg0/zone0/subzone0",
Weight: 1,
ServiceAccount: "",
},
},
Status: nil,
}
)

func TestNonAutoregisteredWorkloads(t *testing.T) {
Expand Down Expand Up @@ -121,13 +155,13 @@ func TestAutoregistrationLifecycle(t *testing.T) {

n := fakeNode("reg1", "zone1", "subzone1")

p := fakeProxy("1.2.3.4", wgA, "nw1")
p := fakeProxy("1.2.3.4", wgA, "nw1", "sa-a")
p.XdsNode = n

p2 := fakeProxy("1.2.3.4", wgA, "nw2")
p2 := fakeProxy("1.2.3.4", wgA, "nw2", "sa-a")
p2.XdsNode = n

p3 := fakeProxy("1.2.3.5", wgA, "nw1")
p3 := fakeProxy("1.2.3.5", wgA, "nw1", "sa-a")
p3.XdsNode = n

// allows associating a Register call with Unregister
Expand Down Expand Up @@ -209,7 +243,39 @@ func TestAutoregistrationLifecycle(t *testing.T) {
return checkNoEntry(store, wgA, p3)
}, retry.Timeout(time.Until(time.Now().Add(21*features.WorkloadEntryCleanupGracePeriod))))
})
t.Run("unverified client", func(t *testing.T) {
p := fakeProxy("1.2.3.6", wgA, "nw1", "")
p.XdsNode = fakeNode("reg1", "zone1", "subzone1")

// Should fail
assert.Error(t, c1.RegisterWorkload(p, time.Now()))
checkNoEntryOrFail(t, store, wgA, p)
})
t.Run("wrong SA client", func(t *testing.T) {
p := fakeProxy("1.2.3.6", wgA, "nw1", "wrong")
p.XdsNode = fakeNode("reg1", "zone1", "subzone1")

// Should fail
assert.Error(t, c1.RegisterWorkload(p, time.Now()))
checkNoEntryOrFail(t, store, wgA, p)
})
t.Run("wrong NS client", func(t *testing.T) {
p := fakeProxy("1.2.3.6", wgA, "nw1", "sa-a")
p.Metadata.Namespace = "wrong"
p.XdsNode = fakeNode("reg1", "zone1", "subzone1")

// Should fail
assert.Error(t, c1.RegisterWorkload(p, time.Now()))
checkNoEntryOrFail(t, store, wgA, p)
})
t.Run("no SA WG", func(t *testing.T) {
p := fakeProxy("1.2.3.6", wgWithoutSA, "nw1", "sa-a")
p.XdsNode = fakeNode("reg1", "zone1", "subzone1")

// Should fail
assert.NoError(t, c1.RegisterWorkload(p, time.Now()))
checkEntryOrFail(t, store, wgWithoutSA, p, n, c1.instanceID)
})
// TODO test garbage collection if pilot stops before disconnect meta is set (relies on heartbeat)
}

Expand All @@ -218,7 +284,7 @@ func TestUpdateHealthCondition(t *testing.T) {
ig, ig2, store := setup(t)
go ig.Run(stop)
go ig2.Run(stop)
p := fakeProxy("1.2.3.4", wgA, "litNw")
p := fakeProxy("1.2.3.4", wgA, "litNw", "sa-a")
p.XdsNode = fakeNode("reg1", "zone1", "subzone1")
ig.RegisterWorkload(p, time.Now())
t.Run("auto registered healthy health", func(t *testing.T) {
Expand Down Expand Up @@ -261,7 +327,7 @@ func TestWorkloadEntryFromGroup(t *testing.T) {
},
},
}
proxy := fakeProxy("10.0.0.1", group, "nw1")
proxy := fakeProxy("10.0.0.1", group, "nw1", "sa")
proxy.XdsNode = fakeNode("rgn2", "zone2", "subzone2")

wantLabels := map[string]string{
Expand Down Expand Up @@ -310,6 +376,8 @@ func setup(t *testing.T) (*Controller, *Controller, model.ConfigStoreController)
c1 := NewController(store, "pilot-1", keepalive.Infinity)
c2 := NewController(store, "pilot-2", keepalive.Infinity)
createOrFail(t, store, wgA)
createOrFail(t, store, wgAWrongNs)
createOrFail(t, store, wgWithoutSA)
return c1, c2, store
}

Expand Down Expand Up @@ -418,6 +486,23 @@ func checkEntryOrFail(
}
}

func checkNoEntryOrFail(
t test.Failer,
store model.ConfigStoreController,
wg config.Config,
proxy *model.Proxy,
) {
name := wg.Name + "-" + proxy.IPAddresses[0]
if proxy.Metadata.Network != "" {
name += "-" + string(proxy.Metadata.Network)
}

cfg := store.Get(gvk.WorkloadEntry, name, wg.Namespace)
if cfg != nil {
t.Fatalf("workload entry found when it was not expected")
}
}

func checkEntryHealth(store model.ConfigStoreController, proxy *model.Proxy, healthy bool) (err error) {
name := proxy.AutoregisteredWorkloadEntryName
cfg := store.Get(gvk.WorkloadEntry, name, proxy.Metadata.Namespace)
Expand Down Expand Up @@ -464,10 +549,15 @@ func checkHealthOrFail(t test.Failer, store model.ConfigStoreController, proxy *
}
}

func fakeProxy(ip string, wg config.Config, nw network.ID) *model.Proxy {
func fakeProxy(ip string, wg config.Config, nw network.ID, sa string) *model.Proxy {
var id *spiffe.Identity
if wg.Namespace != "" && sa != "" {
id = &spiffe.Identity{Namespace: wg.Namespace, ServiceAccount: sa}
}
return &model.Proxy{
IPAddresses: []string{ip},
Labels: map[string]string{"merge": "me"},
IPAddresses: []string{ip},
Labels: map[string]string{"merge": "me"},
VerifiedIdentity: id,
Metadata: &model.NodeMetadata{
AutoRegisterGroup: wg.Name,
Namespace: wg.Namespace,
Expand Down
5 changes: 5 additions & 0 deletions pilot/pkg/features/pilot.go
Original file line number Diff line number Diff line change
Expand Up @@ -695,6 +695,11 @@ var (
false,
"True if the stats runtime should use the Envoy extension instead of the compiled Wasm extension.",
).Get()

ValidateWorkloadEntryIdentity = env.Register("ISTIO_WORKLOAD_ENTRY_VALIDATE_IDENTITY", true,
"If enabled, will validate the identity of a workload matches the identity of the "+
"WorkloadEntry it is associating with for health checks and auto registration. "+
"This flag is added for backwards compatibility only and will be removed in future releases").Get()
)

// EnableEndpointSliceController returns the value of the feature flag and whether it was actually specified.
Expand Down
4 changes: 4 additions & 0 deletions pilot/pkg/xds/vm_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,16 +20,20 @@ import (
"time"

"istio.io/api/networking/v1alpha3"
"istio.io/istio/pilot/pkg/features"
"istio.io/istio/pilot/pkg/model"
"istio.io/istio/pkg/config"
"istio.io/istio/pkg/config/schema/gvk"
"istio.io/istio/pkg/test"
"istio.io/istio/pkg/test/util/assert"
"istio.io/istio/pkg/test/util/retry"
)

// TestRegistration is an e2e test for registration. Most tests are in autoregister package, but this
// exercises the full XDS flow.
func TestRegistration(t *testing.T) {
// TODO: allow fake XDS to be "authenticated"
test.SetForTest(t, &features.ValidateWorkloadEntryIdentity, false)
ds := NewFakeDiscoveryServer(t, FakeOptions{})
ds.Store().Create(config.Config{
Meta: config.Meta{
Expand Down
8 changes: 5 additions & 3 deletions pkg/adsc/adsc.go
Original file line number Diff line number Diff line change
Expand Up @@ -469,15 +469,17 @@ func (a *ADSC) reconnect() {
a.mutex.RUnlock()

err := a.Run()
if err == nil {
a.cfg.BackoffPolicy.Reset()
} else {
if err != nil {
// TODO: fix reconnect
time.AfterFunc(a.cfg.BackoffPolicy.NextBackOff(), a.reconnect)
}
}

func (a *ADSC) handleRecv() {
// We connected, so reset the backoff
if a.cfg.BackoffPolicy != nil {
a.cfg.BackoffPolicy.Reset()
}
for {
var err error
msg, err := a.stream.Recv()
Expand Down
3 changes: 3 additions & 0 deletions pkg/istio-agent/xds_proxy_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ import (

extensions "istio.io/api/extensions/v1alpha1"
networking "istio.io/api/networking/v1alpha3"
"istio.io/istio/pilot/pkg/features"
"istio.io/istio/pilot/pkg/model"
"istio.io/istio/pilot/pkg/model/status"
"istio.io/istio/pilot/pkg/util/protoconv"
Expand Down Expand Up @@ -99,6 +100,8 @@ func TestXdsProxyBasicFlow(t *testing.T) {

// Validates the proxy health checking updates
func TestXdsProxyHealthCheck(t *testing.T) {
// TODO: allow fake XDS to be "authenticated"
test.SetForTest(t, &features.ValidateWorkloadEntryIdentity, false)
healthy := &discovery.DiscoveryRequest{TypeUrl: v3.HealthInfoType}
unhealthy := &discovery.DiscoveryRequest{
TypeUrl: v3.HealthInfoType,
Expand Down

0 comments on commit 0013609

Please sign in to comment.