-
Notifications
You must be signed in to change notification settings - Fork 1.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Memory based loadbalancing #3747
Changes from 1 commit
9f47ce0
a8f73e3
1b7b7b3
69d3de8
5e0c0e8
314f826
23d852e
ba93108
5a0d369
0e837ac
879ba51
3e2c891
7cb97e2
72ae32f
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
- Loading branch information
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -594,6 +594,12 @@ case class ShardingContainerPoolBalancerState( | |
val actualSize = newSize max 1 // if a cluster size < 1 is reported, falls back to a size of 1 (alone) | ||
if (_clusterSize != actualSize) { | ||
_clusterSize = actualSize | ||
if (totalInvokerThreshold / actualSize < MemoryLimit.minMemory) { | ||
logging.warn( | ||
this, | ||
s"registered controllers: ${_clusterSize}: the slots per invoker fall below the min memory of one action.")( | ||
TransactionId.loadbalancer) | ||
} | ||
val newTreshold = (totalInvokerThreshold / actualSize) max MemoryLimit.minMemory // letting this fall below minMemory doesn't make sense | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Should we log a warn message if this would fall below minMemory to let the operator know? |
||
currentInvokerThreshold = newTreshold | ||
_invokerSlots = _invokerSlots.map(_ => new ForcibleSemaphore(currentInvokerThreshold.toMB.toInt)) | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -95,58 +95,54 @@ class ContainerPool(childFactory: ActorRefFactory => ActorRef, | |
// their requests and send them back to the pool for rescheduling (this may happen if "docker" operations | ||
// fail for example, or a container has aged and was destroying itself when a new request was assigned) | ||
case r: Run => | ||
val isFirstMessageOnBuffer = runBuffer.dequeueOption.exists(_._1.msg == r.msg) | ||
val isResentFromBuffer = runBuffer.nonEmpty && isFirstMessageOnBuffer | ||
|
||
// Only process request, if there are no other requests waiting for free slots, or if the current request is the next request to process | ||
if (runBuffer.isEmpty || runBuffer.dequeueOption.exists(_._1.msg == r.msg)) { | ||
if (runBuffer.isEmpty || isFirstMessageOnBuffer) { | ||
val createdContainer = | ||
if (busyPool | ||
.map(_._2.memoryLimit.toMB) | ||
.sum + r.action.limits.memory.megabytes <= poolConfig.userMemory.toMB) { | ||
// Is there enough space on the invoker for this action to be executed. | ||
if (hasPoolSpaceFor(busyPool, r.action.limits.memory.megabytes.MB)) { | ||
// Schedule a job to a warm container | ||
ContainerPool | ||
.schedule(r.action, r.msg.user.namespace.name, freePool) | ||
.map(container => { | ||
(container, "warm") | ||
}) | ||
.orElse { | ||
if (busyPool | ||
.map(_._2.memoryLimit.toMB) | ||
.sum + freePool.map(_._2.memoryLimit.toMB).sum < poolConfig.userMemory.toMB) { | ||
.map(container => (container, "warm")) | ||
.orElse( | ||
// There was no warm container. Try to take a prewarm container or a cold container. | ||
|
||
// Is there enough space to create a new container or do other containers have to be removed? | ||
if (hasPoolSpaceFor(busyPool ++ freePool, r.action.limits.memory.megabytes.MB)) { | ||
takePrewarmContainer(r.action) | ||
.map(container => { | ||
(container, "prewarmed") | ||
}) | ||
.orElse { | ||
Some(createContainer(r.action.limits.memory.megabytes.MB), "cold") | ||
} | ||
} else None | ||
} | ||
.orElse { | ||
.map(container => (container, "prewarmed")) | ||
.orElse(Some(createContainer(r.action.limits.memory.megabytes.MB), "cold")) | ||
} else None) | ||
.orElse( | ||
// Remove a container and create a new one for the given job | ||
ContainerPool | ||
.remove(freePool, r.action.limits.memory.megabytes.MB) | ||
// Only free up the amount, that is really needed to free up | ||
.remove( | ||
freePool, | ||
Math.min(r.action.limits.memory.megabytes, freePool.map(_._2.memoryLimit.toMB).sum).MB) | ||
.map(removeContainer) | ||
// If the list had at least one entry, enough containers were removed to start the new container. After | ||
// removing the containers, we are not interested anymore in the containers that have been removed. | ||
.headOption | ||
.map { _ => | ||
.map(_ => | ||
takePrewarmContainer(r.action) | ||
.map(container => { | ||
(container, "recreated") | ||
}) | ||
.getOrElse { | ||
(createContainer(r.action.limits.memory.megabytes.MB), "recreated") | ||
} | ||
} | ||
} | ||
.map(container => (container, "recreated prewarm")) | ||
.getOrElse(createContainer(r.action.limits.memory.megabytes.MB), "recreated"))) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Better to use non space separator like
|
||
} else None | ||
|
||
createdContainer match { | ||
case Some(((actor, data), containerState)) => | ||
busyPool = busyPool + (actor -> data) | ||
freePool = freePool - actor | ||
// Remove the action that get's executed now from the buffer and execute the next one afterwards. | ||
runBuffer = runBuffer.dequeueOption.map(_._2).getOrElse(runBuffer) | ||
runBuffer.dequeueOption.foreach { case (run, _) => self ! run } | ||
if (isResentFromBuffer) { | ||
// It is guaranteed that the currently executed messages is the head of the queue, if the message comes from the buffer | ||
runBuffer = runBuffer.dequeue._2 | ||
runBuffer.dequeueOption.foreach { case (run, _) => self ! run } | ||
} | ||
actor ! r // forwards the run request to the container | ||
logContainerStart(r, containerState) | ||
case None => | ||
|
@@ -167,7 +163,7 @@ class ContainerPool(childFactory: ActorRefFactory => ActorRef, | |
} else { | ||
r.retryLogDeadline | ||
} | ||
if (!runBuffer.exists(_.msg == r.msg)) { | ||
if (!isResentFromBuffer) { | ||
// Add this request to the buffer, as it is not there yet. | ||
runBuffer = runBuffer.enqueue(r) | ||
} | ||
|
@@ -256,6 +252,10 @@ class ContainerPool(childFactory: ActorRefFactory => ActorRef, | |
freePool = freePool - toDelete | ||
busyPool = busyPool - toDelete | ||
} | ||
|
||
def hasPoolSpaceFor[A](pool: Map[A, ContainerData], memory: ByteSize): Boolean = { | ||
pool.map(_._2.memoryLimit.toMB).sum + memory.toMB <= poolConfig.userMemory.toMB | ||
} | ||
} | ||
|
||
object ContainerPool { | ||
|
@@ -299,18 +299,20 @@ object ContainerPool { | |
case (ref, w: WarmedData) => ref -> w | ||
} | ||
|
||
if (freeContainers.nonEmpty && freeContainers.map(_._2.memoryLimit.toMB).sum >= memory.toMB) { | ||
if (memory > 0.B) { | ||
val (ref, data) = freeContainers.minBy(_._2.lastUsed) | ||
// Catch exception if remaining memory will be negative | ||
val remainingMemory = Try(memory - data.memoryLimit).getOrElse(0.B) | ||
List(ref) ++ remove(freeContainers.filterKeys(_ != ref), remainingMemory) | ||
} else { | ||
// Enough containers are found to get the memory, that is necessary. -> Abort recursion | ||
List.empty | ||
} | ||
// else case: All containers are in use currently, or there is more memory needed than containers can be removed. | ||
} else List.empty | ||
if (memory > 0.B && freeContainers.nonEmpty && freeContainers.map(_._2.memoryLimit.toMB).sum >= memory.toMB) { | ||
// Remove the oldest container if: | ||
// - there is more memory required | ||
// - there are still containers that can be removed | ||
// - there are enough free containers that can be removed | ||
val (ref, data) = freeContainers.minBy(_._2.lastUsed) | ||
// Catch exception if remaining memory will be negative | ||
val remainingMemory = Try(memory - data.memoryLimit).getOrElse(0.B) | ||
List(ref) ++ remove(freeContainers.filterKeys(_ != ref), remainingMemory) | ||
} else { | ||
// If this is the first call: All containers are in use currently, or there is more memory needed than containers can be removed. | ||
// Or, if this is one of the recurstions: Enough containers are found to get the memory, that is necessary. -> Abort recursion | ||
List.empty | ||
} | ||
} | ||
|
||
def props(factory: ActorRefFactory => ActorRef, | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -131,6 +131,7 @@ class ContainerPoolTests | |
it should "reuse a warm container" in within(timeout) { | ||
val (containers, factory) = testContainers(2) | ||
val feed = TestProbe() | ||
// Actions are created with default memory limit (MemoryLimit.stdMemory). This means 4 actions can be scheduled. | ||
val pool = system.actorOf(ContainerPool.props(factory, poolConfig(MemoryLimit.stdMemory * 4), feed.ref)) | ||
|
||
pool ! runMessage | ||
|
@@ -145,6 +146,7 @@ class ContainerPoolTests | |
it should "reuse a warm container when action is the same even if revision changes" in within(timeout) { | ||
val (containers, factory) = testContainers(2) | ||
val feed = TestProbe() | ||
// Actions are created with default memory limit (MemoryLimit.stdMemory). This means 4 actions can be scheduled. | ||
val pool = system.actorOf(ContainerPool.props(factory, poolConfig(MemoryLimit.stdMemory * 4), feed.ref)) | ||
|
||
pool ! runMessage | ||
|
@@ -160,6 +162,7 @@ class ContainerPoolTests | |
val (containers, factory) = testContainers(2) | ||
val feed = TestProbe() | ||
|
||
// Actions are created with default memory limit (MemoryLimit.stdMemory). This means 4 actions can be scheduled. | ||
val pool = system.actorOf(ContainerPool.props(factory, poolConfig(MemoryLimit.stdMemory * 4), feed.ref)) | ||
pool ! runMessage | ||
containers(0).expectMsg(runMessage) | ||
|
@@ -189,19 +192,19 @@ class ContainerPoolTests | |
val (containers, factory) = testContainers(3) | ||
val feed = TestProbe() | ||
|
||
// a pool with slots for 512MB | ||
// a pool with slots for 2 actions with default memory limit. | ||
val pool = system.actorOf(ContainerPool.props(factory, poolConfig(512.MB), feed.ref)) | ||
pool ! runMessage | ||
containers(0).expectMsg(runMessage) | ||
pool ! runMessageDifferentAction | ||
pool ! runMessageDifferentAction // 2 * stdMemory taken -> full | ||
containers(1).expectMsg(runMessageDifferentAction) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Comments in here would be nice, like containers(0).expectMsg(runMessage) // 1 * stdMemory taken
pool ! runMessageDifferentAction
containers(1).expectMsg(runMessageDifferentAction) // 2 * stdMemory taken -> full
...
pool ! runMessageLarge
// need to remove both action to make space for the large action (needs 2 * stdMemory)
containers(0).expectMsg(Remove)
containers(1).expectMsg(Remove)
containers(2).expectMsg(runMessageLarge) |
||
|
||
containers(0).send(pool, NeedWork(warmedData())) | ||
containers(0).send(pool, NeedWork(warmedData())) // first action finished -> 1 * stdMemory taken | ||
feed.expectMsg(MessageFeed.Processed) | ||
containers(1).send(pool, NeedWork(warmedData())) | ||
containers(1).send(pool, NeedWork(warmedData())) // second action finished -> 1 * stdMemory taken | ||
feed.expectMsg(MessageFeed.Processed) | ||
|
||
pool ! runMessageLarge | ||
pool ! runMessageLarge // need to remove both action to make space for the large action (needs 2 * stdMemory) | ||
containers(0).expectMsg(Remove) | ||
containers(1).expectMsg(Remove) | ||
containers(2).expectMsg(runMessageLarge) | ||
|
@@ -269,15 +272,15 @@ class ContainerPoolTests | |
val pool = system.actorOf(ContainerPool.props(factory, poolConfig(MemoryLimit.stdMemory * 2), feed.ref)) | ||
|
||
// Start first action | ||
pool ! runMessage | ||
pool ! runMessage // 1 * stdMemory taken | ||
containers(0).expectMsg(runMessage) | ||
|
||
// Send second action to the pool | ||
pool ! runMessageLarge | ||
pool ! runMessageLarge // message is too large to be processed immediately. | ||
containers(1).expectNoMessage(100.milliseconds) | ||
|
||
// First action is finished | ||
containers(0).send(pool, NeedWork(warmedData())) | ||
containers(0).send(pool, NeedWork(warmedData())) // pool is empty again. | ||
feed.expectMsg(MessageFeed.Processed) | ||
|
||
// Second action should run now | ||
|
@@ -388,7 +391,8 @@ class ContainerPoolTests | |
val feed = TestProbe() | ||
|
||
// Pool with 512 MB usermemory | ||
val pool = system.actorOf(ContainerPool.props(factory, ContainerPoolConfig(MemoryLimit.stdMemory * 2), feed.ref)) | ||
val pool = | ||
system.actorOf(ContainerPool.props(factory, poolConfig(MemoryLimit.stdMemory * 2), feed.ref)) | ||
|
||
// Send action that blocks the pool | ||
pool ! runMessageLarge | ||
|
@@ -420,7 +424,7 @@ class ContainerPoolTests | |
val feed = TestProbe() | ||
|
||
// Pool with 512 MB usermemory | ||
val pool = system.actorOf(ContainerPool.props(factory, ContainerPoolConfig(MemoryLimit.stdMemory * 2), feed.ref)) | ||
val pool = system.actorOf(ContainerPool.props(factory, poolConfig(MemoryLimit.stdMemory * 2), feed.ref)) | ||
|
||
// Send 4 actions to the ContainerPool (Action 0, Action 2 and Action 3 with each 265 MB and Action 1 with 512 MB) | ||
pool ! runMessage | ||
|
@@ -576,6 +580,12 @@ class ContainerPoolObjectTests extends FlatSpec with Matchers with MockFactory { | |
ContainerPool.remove(pool, MemoryLimit.stdMemory) shouldBe List.empty | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Would it make sense to add a test which tests that if you cannot make space for enough capacity, the List is empty? |
||
} | ||
|
||
it should "not provide a container from pool if there is not enough capacity" in { | ||
val pool = Map('first -> warmedData()) | ||
|
||
ContainerPool.remove(pool, MemoryLimit.stdMemory * 2) shouldBe List.empty | ||
} | ||
|
||
it should "provide a container from pool with one single free container" in { | ||
val data = warmedData() | ||
val pool = Map('warm -> data) | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@cbickel Should this be named
invoker-user-memory
. I see following exception on startup as in my caseCONFIG_whisk_loadbalancer_invokerUserMemory
was not definedThere was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You are right.
I'll open a PR to correct this.
Thank you.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
#3993 has the fix.