Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

K8s shard affinity #415

Open
wants to merge 2 commits into
base: stable/xena-m3
Choose a base branch
from
Open

K8s shard affinity #415

wants to merge 2 commits into from

Conversation

leust
Copy link

@leust leust commented Mar 21, 2023

Instances created by K8S orchestrators (Kubernikus or Gardener) which are part of the same K8S cluster should be scheduled in the same shard, because Kubernetes is heavily using volumes and we want to avoid slow attachments due to cross-shard migration of volumes.

(check also the commit message)

Copy link

@joker-at-work joker-at-work left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We're not currently using the returned tags and metadata to group the VMs, do we?

nova/scheduler/host_manager.py Outdated Show resolved Hide resolved
nova/scheduler/filters/shard_filter.py Outdated Show resolved Hide resolved
@leust
Copy link
Author

leust commented Mar 27, 2023

We're not currently using the returned tags and metadata to group the VMs, do we?

I am not sure what you mean by grouping. Can you give a bit more context?

@joker-at-work
Copy link

We're not currently using the returned tags and metadata to group the VMs, do we?

I am not sure what you mean by grouping. Can you give a bit more context?

We want to group by cluster as defined by Gardener/Kubernikus. Currently, if I read it correctly, the code only groups by project-id - which we also need, but only for instances without k8s info. If we know the cluster via metadata/tags, we need to group by that.

@leust
Copy link
Author

leust commented Mar 27, 2023

We want to group by cluster as defined by Gardener/Kubernikus. Currently

We use the query InstanceList.get_by_filters which is supposed to join the tags or metadata and return only the instances matching those tags or metadata.

@joker-at-work
Copy link

We want to group by cluster as defined by Gardener/Kubernikus. Currently

We use the query InstanceList.get_by_filters which is supposed to join the tags or metadata and return only the instances matching those tags or metadata.

You're right. I didn't get that part, it seems. Thanks for explaining. Looking good then 👍

@grandchild
Copy link

Looking good then

@joker-at-work is that an approve, or just a lgtm for this particular part?

@joker-at-work
Copy link

Since this is a WIP-named PR with a WIP commit-message, I didn't want to review the whole thing deeply, because it didn't look finished. So no, this was not an approval for the whole PR, but just the part Marius and I talked about.

@leust leust force-pushed the k8s_shard_affinity branch 3 times, most recently from 2571454 to 9d433ea Compare April 18, 2023 12:36
@leust leust changed the title [WIP] K8s shard affinity K8s shard affinity Apr 18, 2023
@@ -28,6 +31,9 @@
CONF = nova.conf.CONF

_SERVICE_AUTH = None
GARDENER_PREFIX = "kubernetes.io-cluster-shoot--garden--"
KKS_PREFIX = "kubernikus:kluster"
HANA_PREFIX = "hana_"


class ShardFilter(filters.BaseHostFilter):

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The comment for the class needs to be updated. In the cpu_info_migration_filter we also explicitly documented, that we're implementing filter_all() instead of host_passes().

Comment on lines 172 to 177
Can be overridden in a subclass, if you need to base filtering
decisions on all objects. Otherwise, one can just override
_filter_one() to filter a single object.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks like copy-and-paste from the parent class. Maybe rather document why we overwrote it?

Comment on lines 178 to 179
for obj in filter_obj_list:
yield obj

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This could optionally be yield from filter_obj_list

>>> def a():
...   yield from [1, 2, 3]
... 
>>> for x in a():
...   print(x)
... 
1
2
3

tested on Python 3.8


for obj in filter_obj_list:

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the cpu_info_migration_filter we called obj host_state and I would prefer to have that name here, too, because it better describes what we're handling here.

# We allow any shard in this AZ for the first instance.
return True

k8s_hosts = set([i.host for i in siblings])

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There's no need to build the list, sets can be instantiated from iterators, too, i.e. set(i.host for i in siblings) works.

Comment on lines 251 to 268
def _is_hana_flavor(flavor):
return flavor.name.startswith(HANA_PREFIX)

def _is_same_category(instance, flavor):
"""Check whether instance is from the flavor's family."""
if _is_hana_flavor(flavor):
return _is_hana_flavor(instance.flavor)
return True

siblings = [i for i in k8s_instances
if _is_same_category(i, spec_obj.flavor)]

if not siblings:
# This is the first instance of this particular type (HANA).
# We allow any shard in this AZ for the first instance.
return True

k8s_hosts = set([i.host for i in siblings])

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We compute this for every host even though it's not host-dependent. Could we pass k8s_hosts instead of k8s_instances into the method and compute k8s_hosts in filter_all()?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, good idea actually.

Comment on lines 270 to 272
return any(agg.name in host_shard_names and
set(agg.hosts) & k8s_hosts
for agg in host_state.aggregates)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You could pass host_shard_aggrs instead of host_shard_names into this method and then write this as

return any(set(aggr.hosts) & k8s_hosts for aggr in host_shard_aggrs)

Comment on lines 128 to 130
# BigVMs are scheduled based on their own rules.
if nova_utils.is_big_vm(spec_obj.memory_mb, spec_obj.flavor):
return []

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hm ... if we ignore big VMs here, but later explicitly handle hana_* flavors ... I mean big VMs are always hana_* flavors

I think we should ignore hana_* flavors here already and handle them explicitly in a follow-up, as we also have a ticket open on handling them differently.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right now hana_* are forced to their own shard if any is already part of the k8s cluster (see the usage of _is_same_category comparator).

Do you mean to remove that check as well, and simply ignore hana_* for now ?

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I mean we do not go into the _host_passes_k8s() for hana_* flavors and just handle non-hana VMs here for now, as we will implement something afterwards for hanas. IIRC, we will implement "ignore the shard filter for hana flavors unless the project explicitly wants to be bound to a shard for hanas".

Comment on lines 143 to 145
if instances and spec_obj.availability_zone:
return [i for i in instances
if i.availability_zone == spec_obj.availability_zone]

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This means we cannot filter by availability_zone in the query?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we can filter in the query as well, it will do a regex lookup with LIKE %az% (since I don't see availability_zone in exact_match_filter_names)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hm ... doesn't sound too nice. filtering in Python might then be better

Comment on lines 151 to 179
build_request = BuildRequest.get_by_instance_uuid(
elevated, spec_obj.instance_uuid)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If this is a resize, do we we have a BuildRequest? We call this function before we "skip for resize", as we pre-compute things.

@leust
Copy link
Author

leust commented Jun 6, 2023

TODO: We need to support instances being already spread across shards and need to take the shard where most of the instances life.

@leust leust force-pushed the k8s_shard_affinity branch 2 times, most recently from cb9fbdc to a256459 Compare June 11, 2023 13:35
Copy link
Member

@fwiesel fwiesel left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi,

I would move it here also more into the database, as the instance list can be quite expensive, even after filtering for AZ and tags, because the row is quite wide, and contains a lot of data we actually do not care about (key-data, user-data, etc...).

Since this is spread over multiple database instances, we need to split the query up.

  1. Query the hosts for the instances matching the tag/metadata:

The query could look in SQL like this

select `host`, count(*) 
  from tags as t join instances as i on t.resource_id=i.uuid
  where i.deleted=0 and i.availability_zone=$availability_zone and t.tag=$tag
  group by i.`host`
  order by count desc
  limit 1

or for the metadata

select i.`host`, count(*) as count
  from instances as i join instance_metadata as im on im.instance_uuid=i.uuid
  where i.deleted=0 and im.deleted=0 and im.`key`=$key and i.availability_zone=$availability_zone
  group by i.`host`
  order by count desc
  limit 1

You can the build the query with sqlalchemy depending on the given metadata / tags of the similar as you are now.

But since the metadata lives in the cell databases, they need to be scattered over all cells.
Like _get_instance_group_hosts_all_cells

  1. You can then match the host to an aggregate list by simply calling AggregateList.get_by_host.

  2. Then you can get the dominant shard from this list.

Open is still to skip the filter when it isn't a vsphere vm, and to ignore hosts, which are not vmware hosts.

@@ -28,6 +32,9 @@
CONF = nova.conf.CONF

_SERVICE_AUTH = None
GARDENER_PREFIX = "kubernetes.io-cluster-shoot--garden--"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I checked in eu-de-2, and I see there some entries starting just with kubernetes.io-cluster-shoot--hcm-eng .
I would shorten the prefix check to kubernetes.io-cluster-, as I don't think we gain much by getting more specific.

@fwiesel
Copy link
Member

fwiesel commented Jul 6, 2023

Extending the queries from (1) by the following should do the trick to only get the vmware hosts:

select
  ...
join compute_nodes as cn on i.node=cn.hypervisor_hostname 
where ...
 cn.deleted=0 and cn.hypervisor_type='VMware vCenter Server'

@leust leust force-pushed the k8s_shard_affinity branch 2 times, most recently from 5dd9c7b to 867a490 Compare July 29, 2023 06:12
mock_is_non_vmware_spec.assert_called_once_with(spec_obj)

def _assert_passes(self, host, spec_obj, passes):

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Optional: I think reading self._assert_passes(False, host, spec) is easier to read than self._assert_passes(host, spec, False), because the False basically belongs to the passes. One could even argue, that it should be part of the method name.

Comment on lines +419 to +469
self.assertEqual(3, len(result))
self.assertEqual(result[0].host, 'host1')
self.assertEqual(result[1].host, 'host2')
self.assertEqual(result[2].host, 'host3')

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a reason not to write self.assertEqual(result, ['host1', 'host2', 'host3']) here?

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

because there's a host property...

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah. How did I not see that? :D
So it would have to be self.assertEqual([r.host for r in result], ['host1', 'host2', 'host3']). Yeah, not sure if that's any better.

Comment on lines 118 to 125
instance_uuid=self.fake_build_req.instance_uuid,
flavor=fake_flavor.fake_flavor_obj(
mock.sentinel.ctx, expected_attrs=['extra_specs']))

self.filt_cls._PROJECT_SHARD_CACHE['foo'] = []
self.assertFalse(self.filt_cls.host_passes(host, spec_obj))
self._assert_passes(host, spec_obj, False)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you think we need to check if the k8s part of the ShardFilter is called i.e. what makes sure that _get_k8s_shard() return None?

Ah, it seems to be ensured by the self.fake_build_req returned in BuildRequest.get_by_instance_uuid. That's ... not right, I guess, because BuildRequest objects should get deleted after the instance is spawned. Therefore, we only test spawns here and not live-migrations or offline migrations.

Comment on lines 175 to 109
if utils.is_non_vmware_spec(spec_obj):
return True
return filter_obj_list

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would we want to a log a debug message here, that the filter is not applicable for the request, because it's non-VMware?

Comment on lines 228 to 229
return any(host_shard == k8s_shard
for host_shard in host_shard_names)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We do not have a log line here, so we'd only see "project enabled for all shard" or "… found in project" messages from above. In the end, we do not see that the host was rejected due to not matching the k8s shard. I think we should add a log line with the result here.


@require_context
@pick_context_manager_reader_allow_async
def instance_get_host_by_metadata(context, meta_key, meta_value,

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same here: Please rename the function.

def instance_get_host_by_tag(context, tag, filters=None):
count_label = func.count('*').label('count')
query = context.session.query(models.Instance, count_label). \
join(models.Tag, models.Tag.resource_id == models.Instance.uuid)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you should be able to use the tags backref from the Tags model directly instead of having to define the join-condition again. Upstream seems to do it like this.

Comment on lines 2103 to 2107
result = query.all()
if result:
return result[0]
else:
return None

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Optional: You could use query.first() instead. We could also use query.one(), but would have to catch an exception if there is no result, so first() should be better.

return query.first()

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As discussed in the last meeting, we will probably need all hosts returned, because we need to find the shard with most VMs, not the host.

Comment on lines 2127 to 2131
result = query.all()
if result:
return result[0]
else:
return None

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Optional: Could be return query.first(), too.

@@ -2087,6 +2087,69 @@ def instance_get_active_by_window_joined(context, begin, end=None,
return _instances_fill_metadata(context, query.all(), manual_joins)


@require_context
@pick_context_manager_reader_allow_async
def instance_get_host_by_tag(context, tag, filters=None):

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add comments stating the purpose of the 3 functions in this file.

Comment on lines 2093 to 2094
"""Get the list of K8S hosts and the number of instances associated to
that K8S running on that host, querying by instances tags.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

typo: "associated to that K8S running on that host" should imho be "associated to the K8S cluster running on that host", right? Could also be fine to just remove "that".

Comment on lines 2116 to 2117
"""Get the list of K8S hosts and the number of instances associated to
that K8S running on that host, querying by instances metadata.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

typo: same as above

Comment on lines 2100 to 2109
query = context.session.query(models.Instance, count_label). \
join(models.Instance.tags)
query = _handle_k8s_hosts_query_filters(query, filters)
query = query.filter(models.Instance.deleted == 0,
models.Tag.tag == tag)

query = query.group_by(models.Instance.host). \
order_by(sql.desc(count_label))

return query.all()

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We're not querying for models.Instance.host explicitly. How does it work, that we only get models.Instance.host in the end? The other query in the function down below explicitly queries for models.Instance.host.

Comment on lines 2158 to 2160
if hv_type:
query = query.filter(models.ComputeNode.deleted == 0,
models.ComputeNode.hypervisor_type == hv_type)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Q: Why are these down here (I was confused, because I thought the hv_type was fully handled above)? Do all joins need to happen first before we can start filtering? Have you thought about adding these filters to the join condition?

Comment on lines 131 to 132
Returns None if the request is not for an instance that's part of
a K8S cluster, or if this is the first instance of a new cluster.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

... or if it's a HANA flavor or if it's a resize

return build_request.instance.metadata if build_request \
else instance.metadata

check_type = spec_obj.get_scheduler_hint('_nova_check_type')

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this safe i.e. do we have a build request every time we do not set _nova_check_type? I see resize, live-migrate and rebuild setting it, but e.g. not unshelve.

Would it make sense to try and get the BuildRequest if we don't have a check_type, but fall back to getting the instance instead?

Comment on lines 189 to 197
k8s_hosts = {}

for cell_result in results.values():
if not nova_context.is_cell_failure_sentinel(cell_result):
cell_hosts = dict(cell_result)
k8s_hosts = {
h: k8s_hosts.get(h, 0) + cell_hosts.get(h, 0)
for h in set(cell_hosts) | set(k8s_hosts)
}

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggestion:
With a defaultdict, this could look like:

k8s_hosts = defaultdict(lambda: 0)
for cell_result in results.values():
    if nova_context.is_cell_failure_sentinel(cell_result):
        continue
    for h, c in cell_hosts.items():
        k8s_hosts[h] += c

I think this would be easier readable.

Additionally: Is there a way for a host to turn up in multiple cells? If not, shouldn't we be able to use k8s_hosts.update(cell_hosts) after checking that it's not a failure?

Additionally: Should we really schedule if we have a failure? The majority of the cluster might reside in the failed shard.

Comment on lines 272 to 274
matches = any(host_shard == k8s_shard
for host_shard in host_shard_names)
if not matches:

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Couldn't we use if k8s_shard in host_shard_names here? Would be much shorter.
We'd need to explicitly return False and return True then, but I don't see that as a downside.
As an alternative, we could do matches = k8s_shard in host_shard_names, too.

Comment on lines 275 to 285
LOG.debug("%(host_state)s is not part of the requested "
"K8S cluster shard '%(k8s_shard)s'",

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

typo: since the user doesn't request a shard but a k8s cluster, imho this needs to be "requested K8S cluster's shard"

I'm also wondering if we shouldn't just ditch "requested" here and make it "is not part of the K8S cluster's shard"

@leust leust force-pushed the k8s_shard_affinity branch 2 times, most recently from 112e9f6 to ad10664 Compare September 25, 2023 18:47
Copy link
Member

@fwiesel fwiesel left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe I am missing something, but how to we now migrate a cluster out of a shard?

In my memory, we set the sharding on the project level, which ensures that new instances are on the new shard, and then we migrate the vms over.

But I don't think that is possible with the proposed code.

My memory is here a bit vague though. Am I mistaken about how we wanted to migrate things, or how it is supposed to work with the code?

filters = {
'hv_type': 'The hypervisor_type',
'availability_zone': 'The availability zone'
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Missing skip_instance_uuid in the documentation

if not instance and not build_request:
LOG.warning("There were no build_request and no instance "
"for the uuid %s", spec_obj.instance_uuid)
return
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

return None , so that all paths return something explicitly.

Instances that are part of the same K8S cluster will get scheduled
to the same shard (vCenter).

It identifies the K8S cluster by looking at the tags or metadata
set by the k8s cluster orchestrators when creating the instances.
Kubernikus and Gardener are supported for now.

It queries the database to determine the dominant shard, by looking
which shard contains the most instances of a given K8S cluster.

BigVMs are "out of the picture" and should not adhere to shards.
They are only scheduled on their allocated hosts.

The K8S logic is skipped for offline migrations (and thus for
resizes too) since offline migration is a non-usecase for K8S.

Change-Id: I73d04ba295d23db1d4728e9db124fc2a27c2d4bc
In VMWare we need to handle race conditions when spawning k8s
instances in parallel while building a new cluster.
Similar to how server groups are validated prior to spawning the
VM on the compute host, we add a new method on the driver
`validate_instance_group_policy` that checks driver-specific
grouping policy (in this case, the K8S shard for the instance)

Change-Id: I04151875fae44b72be52127e3b160f7f95abfb9e
Copy link

@joker-at-work joker-at-work left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In addition to Fabian's comments, I found these

for aggr in k8s_shard_aggrs)
if not matches:
msg = ("Host %(host)s rejected K8S instance %(instance_uuid)s "
"because the K8S cluster is not part to this shard."

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

typo: "part to" -> "part of"

for aggr in k8s_shard_aggrs)
if not matches:
msg = ("Host %(host)s rejected K8S instance %(instance_uuid)s "
"because the K8S cluster is not part to this shard."

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What does "this shard" mean in this context? Shouldn't that be "any already used shards"?

Keep this check in mind for the live-migration with target shard, because this check would probably prevent it.

Comment on lines +1810 to +1816
def validate_instance_group_policy(self, context, instance):
"""Validates that the instance meets driver-specific grouping policy

The driver can raise exception.RescheduledException to reject and
trigger rescheduling of the instance to a different host.
"""
pass

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

instance_group being the name for server groups inside Nova, this function name implies a relation to server groups while it doesn't have any.

I see that you tried to keep it generic, but maybe we need to name it "validate_k8s_shard" after all :/

Comment on lines +58 to +60
results = nova_context.scatter_gather_skip_cell0(
context, ComputeNodeList.get_k8s_hosts_by_instances_tag,
kks_tag, filters=q_filters)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we have to do that in all cells? Is that for non-AZ cells?

Comment on lines +1736 to +1743
def _validate_driver_instance_group_policy(self, context, instance):
lock_id = "driver-instance-group-validation-%s" % instance.uuid

@utils.synchronized(lock_id)
def _do_validation(context, instance):
self.driver.validate_instance_group_policy(context, instance)

_do_validation(context, instance)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A little docstring would be nice, e.g. why we need a lock here.

Comment on lines +2164 to +2165
query.filter(
models.Instance.uuid != skip_instance_uuid)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Missing a query =

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

4 participants