-
Notifications
You must be signed in to change notification settings - Fork 1.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Exclude warmed containers in disabled invokers. #5313
Conversation
85b92cb
to
f59e41f
Compare
.map(_.split("/").takeRight(3).apply(0)) | ||
if (chosenInvoker.nonEmpty) { | ||
creationJobManager ! RegisterCreationJob(msg) | ||
sendCreationContainerToInvoker(messagingProducer, chosenInvoker.get.toInt, msg) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There were side effects in this method like this one.
This method is supposed to filter warmed creations but it also sent creation messages to invokers.
This kind of side effect makes it harder to test this method, I removed side effects.
@@ -360,6 +337,100 @@ object ContainerManager { | |||
*/ | |||
def rng(mod: Int): Int = ThreadLocalRandom.current().nextInt(mod) | |||
|
|||
// Partition messages that can use warmed containers. | |||
// return: (list of messages that cannot use warmed containers, list of messages that can take advantage of warmed containers) | |||
protected[container] def filterWarmedCreations(warmedContainers: Set[String], |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Now, this method just partitions the creation messages into two sets, warmed creations, and cold creations.
Since I removed the side effects, this method can be located in the companion object now.
logging.info(this, s"Choose a warmed container $container") | ||
|
||
// this is required to exclude already chosen invokers | ||
inProgressWarmedContainers.update(msg.creationId.asString, container) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This looks like a side effect, but inProgressWarmedContainers
is a local variable passed as an argument and only valid in the scope of this method.
...cheduler/src/main/scala/org/apache/openwhisk/core/scheduler/container/ContainerManager.scala
Show resolved
Hide resolved
.contains(invoker.toInt)) | ||
|
||
if (chosenInvoker.nonEmpty) { | ||
(msg, Some(chosenInvoker.head.toInt), Some(container.head)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Now, this is an example of the return value.
When there are available warmed creations, it will return tuple3 with the original message, the chosen invoker, and the chosen warmed container key.
The warmed container key is required to update the member variable, inProgressWarmedContainers
of this class,
if (instance.id.userMemory.toMB < requiredMemory.megabytes) { | ||
val split = candidates.splitAt(idx) | ||
val _ :: t1 = split._2 | ||
chooseInvokerFromCandidates(split._1 ::: t1, msg) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Now, this method recurses itself without the invoker with not enough resources.
private def createContainer(msgs: List[ContainerCreationMessage], | ||
memory: ByteSize, | ||
invocationNamespace: String): Unit = { | ||
private def createContainer(msgs: List[ContainerCreationMessage], memory: ByteSize, invocationNamespace: String)( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Now, this method only has side effects.
Since the internal core logic(which is extracted as methods) has no side effects, it is testable and side effects are isolated to this method.
|
||
// no matter how many time we schedule the msg, it should always choose invoker2. | ||
(1 to 10).foreach { _ => | ||
val newPairs = ContainerManager.chooseInvokerFromCandidates(candidates, msg) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can add unit tests for chooseInvokerFromCandidates
now.
val msgs = List(msg1, msg2, msg3) | ||
|
||
val (coldCreations, warmedCreations) = | ||
ContainerManager.filterWarmedCreations(warmedContainers, inProgressWarmedContainers, invokers, msgs) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can add unit tests for filterWarmedCreations
now.
coldCreations.map(_._1).contains(msg3) shouldBe true | ||
} | ||
|
||
it should "choose cold creation when warmed containers are in disabled invokers" in { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This test will guarantee no disabled invoker is used because of warmed containers.
val warmedApplied = msgs.map { msg => | ||
val warmedPrefix = | ||
containerPrefix(ContainerKeys.warmedPrefix, msg.invocationNamespace, msg.action, Some(msg.revision)) | ||
val container = warmedContainers |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
container -> containers ? because it may has several values in Seq.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I updated the logic to find
only one container.
val updatedInvokers = chosenInvokers.foldLeft(invokers) { (invokers, chosenInvoker) => | ||
chosenInvoker match { | ||
case Some((chosenInvoker, msg)) => | ||
updateInvokerMemory(chosenInvoker, msg.whiskActionMetaData.limits.memory.megabytes, invokers) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
After executed updateInvokerMemory
, the invokers's memory info would be synced in all schedulers?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Due to warmedContainers already existed in invoker side, seems it will just use existing warmed container, it will consume new memory?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
After executed updateInvokerMemory, the invokers's memory info would be synced in all schedulers?
It is not synchronized among schedulers.
The actual memory usage is updated by invokers when a new container is created.
If there is not enough memory in an invoker, it will respond with rescheduling ack, and the message is scheduled to another invoker.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So here, update memory in ContainerManager, just improve the performance? can judge in advance?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When multiple messages are scheduled, we need to consider already scheduled memory.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Due to warmedContainers already existed in invoker side, seems it will just use existing warmed container, it will consume new memory?
I confirmed that warmed container memories are not considered as busy memory.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
hm.for coldCreations
, i understand why need to updateInvokerMemory(scheduledPair.invokerId, msg.whiskActionMetaData.limits.memory.megabytes, invokers)
But for warmCreations
, i don't understand why need to that,
in my understanding, seems has no need to update memory for warmCreations
becausee it would not consume memory.
And for coldCreations
, i still have some concerns, e.g.
If has a lot of coldCreations
, so update memory operation may executes frequently during a very short time, but invoker side, there still exist a lot of
free memory.
I am not sure whether this case would happen.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
becausee it would not consume memory.
First, it "would" consume memory.
When an invoker advertises its memory, it does not include the memory of warmed containers.
So no matter how many warm containers are there, the invoker will advertise full free memory if there is no running container.
Second, when we schedule containers, let's say we need to schedule 5 containers, if the first 3 are scheduled to some invokers, we need to consider the scheduling result to reduce the invokers' free memory when scheduling the rest 2 containers.
Since the getAvailableInvokers
method is executed only once for each container creation list, we need to schedule containers based on a snapshot of invoker memory.
But still, it should be updated during scheduling. This is to guarantee at least we don't send a container creation request to an invoker with not enough resources based on a snapshot.
For the next container creation list, it will fetch the actual invoker resources again and do the same.
And even if we here update the invoker resources, the actual invoker resources are only updated by invokers and advertised via ETCD. We here just update the snapshot of invoker resources for the subsequent scheduling.
As you said, there would be other schedulers, and they can also schedule some containers to the same invoker.
But invokers are randomly selected, and it would rarely saturate an invoker due to concurrent requests to the invoker. When there are not enough resources in the whole cluster, that may happen frequently, but if such a situation perpetuates, then that implies we need to scale out our cluster.
Also, even if an invoker without enough resources receives container creation requests, it will return it to reschedule to another invoker.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Got it
if (candidates.isEmpty) { | ||
ScheduledPair(msg, invokerId = None, Some(NoAvailableInvokersError), Some(s"No available invokers.")) | ||
} else if (candidates.forall(p => p.id.userMemory.toMB < requiredMemory.megabytes)) { | ||
ScheduledPair(msg, invokerId = None, Some(NoAvailableInvokersError), Some(s"No available invokers.")) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Seems case1 and case2's reason is same No available invokers
But they has small difference, e.g. case2, No available invokers with lack memory.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So do you suggest adding a new error?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
hm...no. i think it is ok.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I changed the error to ResourceNotEnoughError
Codecov Report
@@ Coverage Diff @@
## master #5313 +/- ##
==========================================
- Coverage 81.13% 76.41% -4.72%
==========================================
Files 239 239
Lines 14192 14238 +46
Branches 580 599 +19
==========================================
- Hits 11514 10880 -634
- Misses 2678 3358 +680
📣 We’re building smart automated test selection to slash your CI/CD build times. Learn more |
9b3fcf2
to
5cf819e
Compare
warmUp() | ||
complete("Success enable invoker") | ||
} | ||
|
||
override def disable(): Route = { | ||
invokerHealthManager ! GracefulShutdown | ||
pool ! GracefulShutdown | ||
consumer.foreach(_.close()) | ||
consumer = None |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Even if the invoker is disabled, it handles messages that may come in for a while. 👍
...cheduler/src/main/scala/org/apache/openwhisk/core/scheduler/container/ContainerManager.scala
Show resolved
Hide resolved
core/invoker/src/main/scala/org/apache/openwhisk/core/invoker/FPCInvokerReactive.scala
Outdated
Show resolved
Hide resolved
@@ -636,6 +636,23 @@ object ContainerMessage extends DefaultJsonProtocol { | |||
sealed trait ContainerCreationError | |||
|
|||
object ContainerCreationError extends Enumeration { | |||
import scala.language.implicitConversions | |||
implicit def containerCreationErrorToString(x: ContainerCreationError): String = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I added an implicit conversion to remove the reason
field from this.
https://github.com/apache/openwhisk/pull/5313/files#diff-32b47fe5e4d4cdb36b4ea9095511f467e626667c207ca52f3064535d4b8b28d3R56
LGTM |
merged as it's a critical issue that breaks the scheduling fundamental. |
* Exclude warmed containers in disabled invokers. * Exclude warmed containers in disabled invokers. * Find the first warmed container. * Remove the code added by mistake. * Add more logs for error cases.
Description
This is not to send container creation messages to disabled invokers.
Related issue and scope
My changes affect the following components
Types of changes
Checklist: