Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Exclude warmed containers in disabled invokers. #5313

Merged
merged 5 commits into from
Aug 22, 2022

Conversation

style95
Copy link
Member

@style95 style95 commented Aug 18, 2022

Description

This is not to send container creation messages to disabled invokers.

Related issue and scope

My changes affect the following components

  • API
  • Controller
  • Message Bus (e.g., Kafka)
  • Loadbalancer
  • Scheduler
  • Invoker
  • Intrinsic actions (e.g., sequences, conductors)
  • Data stores (e.g., CouchDB)
  • Tests
  • Deployment
  • CLI
  • General tooling
  • Documentation

Types of changes

  • Bug fix (generally a non-breaking change which closes an issue).
  • Enhancement or new feature (adds new functionality).
  • Breaking change (a bug fix or enhancement which changes existing behavior).

Checklist:

  • I signed an Apache CLA.
  • I reviewed the style guides and followed the recommendations (Travis CI will check :).
  • I added tests to cover my changes.
  • My changes require further changes to the documentation.
  • I updated the documentation where necessary.

.map(_.split("/").takeRight(3).apply(0))
if (chosenInvoker.nonEmpty) {
creationJobManager ! RegisterCreationJob(msg)
sendCreationContainerToInvoker(messagingProducer, chosenInvoker.get.toInt, msg)
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There were side effects in this method like this one.
This method is supposed to filter warmed creations but it also sent creation messages to invokers.
This kind of side effect makes it harder to test this method, I removed side effects.

@@ -360,6 +337,100 @@ object ContainerManager {
*/
def rng(mod: Int): Int = ThreadLocalRandom.current().nextInt(mod)

// Partition messages that can use warmed containers.
// return: (list of messages that cannot use warmed containers, list of messages that can take advantage of warmed containers)
protected[container] def filterWarmedCreations(warmedContainers: Set[String],
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Now, this method just partitions the creation messages into two sets, warmed creations, and cold creations.
Since I removed the side effects, this method can be located in the companion object now.

logging.info(this, s"Choose a warmed container $container")

// this is required to exclude already chosen invokers
inProgressWarmedContainers.update(msg.creationId.asString, container)
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks like a side effect, but inProgressWarmedContainers is a local variable passed as an argument and only valid in the scope of this method.

.contains(invoker.toInt))

if (chosenInvoker.nonEmpty) {
(msg, Some(chosenInvoker.head.toInt), Some(container.head))
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Now, this is an example of the return value.
When there are available warmed creations, it will return tuple3 with the original message, the chosen invoker, and the chosen warmed container key.

The warmed container key is required to update the member variable, inProgressWarmedContainers of this class,

if (instance.id.userMemory.toMB < requiredMemory.megabytes) {
val split = candidates.splitAt(idx)
val _ :: t1 = split._2
chooseInvokerFromCandidates(split._1 ::: t1, msg)
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Now, this method recurses itself without the invoker with not enough resources.

private def createContainer(msgs: List[ContainerCreationMessage],
memory: ByteSize,
invocationNamespace: String): Unit = {
private def createContainer(msgs: List[ContainerCreationMessage], memory: ByteSize, invocationNamespace: String)(
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Now, this method only has side effects.
Since the internal core logic(which is extracted as methods) has no side effects, it is testable and side effects are isolated to this method.


// no matter how many time we schedule the msg, it should always choose invoker2.
(1 to 10).foreach { _ =>
val newPairs = ContainerManager.chooseInvokerFromCandidates(candidates, msg)
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can add unit tests for chooseInvokerFromCandidates now.

val msgs = List(msg1, msg2, msg3)

val (coldCreations, warmedCreations) =
ContainerManager.filterWarmedCreations(warmedContainers, inProgressWarmedContainers, invokers, msgs)
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can add unit tests for filterWarmedCreations now.

coldCreations.map(_._1).contains(msg3) shouldBe true
}

it should "choose cold creation when warmed containers are in disabled invokers" in {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This test will guarantee no disabled invoker is used because of warmed containers.

val warmedApplied = msgs.map { msg =>
val warmedPrefix =
containerPrefix(ContainerKeys.warmedPrefix, msg.invocationNamespace, msg.action, Some(msg.revision))
val container = warmedContainers
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

container -> containers ? because it may has several values in Seq.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I updated the logic to find only one container.

val updatedInvokers = chosenInvokers.foldLeft(invokers) { (invokers, chosenInvoker) =>
chosenInvoker match {
case Some((chosenInvoker, msg)) =>
updateInvokerMemory(chosenInvoker, msg.whiskActionMetaData.limits.memory.megabytes, invokers)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After executed updateInvokerMemory, the invokers's memory info would be synced in all schedulers?

Copy link
Contributor

@ningyougang ningyougang Aug 19, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Due to warmedContainers already existed in invoker side, seems it will just use existing warmed container, it will consume new memory?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After executed updateInvokerMemory, the invokers's memory info would be synced in all schedulers?

It is not synchronized among schedulers.
The actual memory usage is updated by invokers when a new container is created.
If there is not enough memory in an invoker, it will respond with rescheduling ack, and the message is scheduled to another invoker.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So here, update memory in ContainerManager, just improve the performance? can judge in advance?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When multiple messages are scheduled, we need to consider already scheduled memory.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Due to warmedContainers already existed in invoker side, seems it will just use existing warmed container, it will consume new memory?

I confirmed that warmed container memories are not considered as busy memory.

Copy link
Contributor

@ningyougang ningyougang Aug 19, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hm.for coldCreations, i understand why need to updateInvokerMemory(scheduledPair.invokerId, msg.whiskActionMetaData.limits.memory.megabytes, invokers)

But for warmCreations, i don't understand why need to that,
in my understanding, seems has no need to update memory for warmCreations becausee it would not consume memory.

And for coldCreations, i still have some concerns, e.g.
If has a lot of coldCreations, so update memory operation may executes frequently during a very short time, but invoker side, there still exist a lot of
free memory.
I am not sure whether this case would happen.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

becausee it would not consume memory.

First, it "would" consume memory.
When an invoker advertises its memory, it does not include the memory of warmed containers.
So no matter how many warm containers are there, the invoker will advertise full free memory if there is no running container.

Second, when we schedule containers, let's say we need to schedule 5 containers, if the first 3 are scheduled to some invokers, we need to consider the scheduling result to reduce the invokers' free memory when scheduling the rest 2 containers.
Since the getAvailableInvokers method is executed only once for each container creation list, we need to schedule containers based on a snapshot of invoker memory.
But still, it should be updated during scheduling. This is to guarantee at least we don't send a container creation request to an invoker with not enough resources based on a snapshot.
For the next container creation list, it will fetch the actual invoker resources again and do the same.
And even if we here update the invoker resources, the actual invoker resources are only updated by invokers and advertised via ETCD. We here just update the snapshot of invoker resources for the subsequent scheduling.

As you said, there would be other schedulers, and they can also schedule some containers to the same invoker.
But invokers are randomly selected, and it would rarely saturate an invoker due to concurrent requests to the invoker. When there are not enough resources in the whole cluster, that may happen frequently, but if such a situation perpetuates, then that implies we need to scale out our cluster.
Also, even if an invoker without enough resources receives container creation requests, it will return it to reschedule to another invoker.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Got it

if (candidates.isEmpty) {
ScheduledPair(msg, invokerId = None, Some(NoAvailableInvokersError), Some(s"No available invokers."))
} else if (candidates.forall(p => p.id.userMemory.toMB < requiredMemory.megabytes)) {
ScheduledPair(msg, invokerId = None, Some(NoAvailableInvokersError), Some(s"No available invokers."))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems case1 and case2's reason is same No available invokers
But they has small difference, e.g. case2, No available invokers with lack memory.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So do you suggest adding a new error?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hm...no. i think it is ok.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I changed the error to ResourceNotEnoughError

@codecov-commenter
Copy link

codecov-commenter commented Aug 19, 2022

Codecov Report

Merging #5313 (f8f9f51) into master (7de8bad) will decrease coverage by 4.71%.
The diff coverage is 88.61%.

@@            Coverage Diff             @@
##           master    #5313      +/-   ##
==========================================
- Coverage   81.13%   76.41%   -4.72%     
==========================================
  Files         239      239              
  Lines       14192    14238      +46     
  Branches      580      599      +19     
==========================================
- Hits        11514    10880     -634     
- Misses       2678     3358     +680     
Impacted Files Coverage Δ
...he/openwhisk/core/invoker/FPCInvokerReactive.scala 0.00% <0.00%> (ø)
.../org/apache/openwhisk/core/connector/Message.scala 70.27% <8.33%> (-2.59%) ⬇️
...sk/core/scheduler/container/ContainerManager.scala 95.83% <98.18%> (-1.06%) ⬇️
...core/database/cosmosdb/RxObservableImplicits.scala 0.00% <0.00%> (-100.00%) ⬇️
...ore/database/cosmosdb/cache/CacheInvalidator.scala 0.00% <0.00%> (-100.00%) ⬇️
...e/database/cosmosdb/cache/ChangeFeedConsumer.scala 0.00% <0.00%> (-100.00%) ⬇️
...core/database/cosmosdb/CosmosDBArtifactStore.scala 0.00% <0.00%> (-95.48%) ⬇️
...sk/core/database/cosmosdb/CosmosDBViewMapper.scala 0.00% <0.00%> (-93.90%) ⬇️
...tabase/cosmosdb/cache/CacheInvalidatorConfig.scala 0.00% <0.00%> (-92.31%) ⬇️
...enwhisk/connector/kafka/KamonMetricsReporter.scala 0.00% <0.00%> (-83.34%) ⬇️
... and 19 more

📣 We’re building smart automated test selection to slash your CI/CD build times. Learn more

warmUp()
complete("Success enable invoker")
}

override def disable(): Route = {
invokerHealthManager ! GracefulShutdown
pool ! GracefulShutdown
consumer.foreach(_.close())
consumer = None
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Even if the invoker is disabled, it handles messages that may come in for a while. 👍

@@ -636,6 +636,23 @@ object ContainerMessage extends DefaultJsonProtocol {
sealed trait ContainerCreationError

object ContainerCreationError extends Enumeration {
import scala.language.implicitConversions
implicit def containerCreationErrorToString(x: ContainerCreationError): String = {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ningyougang
Copy link
Contributor

LGTM

@style95 style95 merged commit e36b2d8 into apache:master Aug 22, 2022
@style95
Copy link
Member Author

style95 commented Aug 22, 2022

merged as it's a critical issue that breaks the scheduling fundamental.

msciabarra pushed a commit to nuvolaris/openwhisk that referenced this pull request Nov 23, 2022
* Exclude warmed containers in disabled invokers.

* Exclude warmed containers in disabled invokers.

* Find the first warmed container.

* Remove the code added by mistake.

* Add more logs for error cases.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

4 participants