Skip to content

Commit

Permalink
Review
Browse files Browse the repository at this point in the history
  • Loading branch information
cbickel committed Jul 2, 2018
1 parent a4ced20 commit 2e334a2
Show file tree
Hide file tree
Showing 5 changed files with 64 additions and 54 deletions.
2 changes: 1 addition & 1 deletion ansible/group_vars/all
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,7 @@ invoker:
port: 12001
heap: "{{ invoker_heap | default('2g') }}"
arguments: "{{ invoker_arguments | default('') }}"
userMemory: "{{ invoker_user_memory | default('4096 m') }}"
userMemory: "{{ invoker_user_memory | default('1024 m') }}"
instances: "{{ groups['invokers'] | length }}"
# Specify if it is allowed to deploy more than 1 invoker on a single machine.
allowMultipleInstances: "{{ invoker_allow_multiple_instances | default(false) }}"
Expand Down
1 change: 1 addition & 0 deletions core/controller/src/main/resources/reference.conf
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ whisk {
use-cluster-bootstrap: false
}
loadbalancer {
user-memory: 1024 m
blackbox-fraction: 10%
}
controller {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -471,6 +471,12 @@ case class ShardingContainerPoolBalancerState(
val actualSize = newSize max 1 // if a cluster size < 1 is reported, falls back to a size of 1 (alone)
if (_clusterSize != actualSize) {
_clusterSize = actualSize
if (totalInvokerThreshold / actualSize < MemoryLimit.minMemory) {
logging.warn(
this,
s"registered controllers: ${_clusterSize}: the slots per invoker fall below the min memory of one action.")(
TransactionId.loadbalancer)
}
val newTreshold = (totalInvokerThreshold / actualSize) max MemoryLimit.minMemory // letting this fall below minMemory doesn't make sense
currentInvokerThreshold = newTreshold
_invokerSlots = _invokerSlots.map(_ => new ForcableSemaphore(currentInvokerThreshold.toMB.toInt))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,58 +95,50 @@ class ContainerPool(childFactory: ActorRefFactory => ActorRef,
// their requests and send them back to the pool for rescheduling (this may happen if "docker" operations
// fail for example, or a container has aged and was destroying itself when a new request was assigned)
case r: Run =>
val isResentFromBuffer = !runBuffer.isEmpty && runBuffer.dequeueOption.exists(_._1.msg == r.msg)

// Only process request, if there are no other requests waiting for free slots, or if the current request is the next request to process
if (runBuffer.isEmpty || runBuffer.dequeueOption.exists(_._1.msg == r.msg)) {
val createdContainer =
if (busyPool
.map(_._2.memoryLimit.toMB)
.sum + r.action.limits.memory.megabytes <= poolConfig.userMemory.toMB) {
// Is there enough space on the invoker for this action to be executed.
if (hasPoolSpaceFor(busyPool, r.action.limits.memory.megabytes.MB)) {
// Schedule a job to a warm container
ContainerPool
.schedule(r.action, r.msg.user.namespace.name, freePool)
.map(container => {
(container, "warm")
})
.orElse {
if (busyPool
.map(_._2.memoryLimit.toMB)
.sum + freePool.map(_._2.memoryLimit.toMB).sum < poolConfig.userMemory.toMB) {
.map(container => (container, "warm"))
.orElse(
// There was no warm container. Try to take a prewarm container or a cold container.

// Is there enough space to create a new container or do other containers have to be removed?
if (hasPoolSpaceFor(busyPool ++ freePool, r.action.limits.memory.megabytes.MB)) {
takePrewarmContainer(r.action)
.map(container => {
(container, "prewarmed")
})
.orElse {
Some(createContainer(r.action.limits.memory.megabytes.MB), "cold")
}
} else None
}
.orElse {
.map(container => (container, "prewarmed"))
.orElse(Some(createContainer(r.action.limits.memory.megabytes.MB), "cold"))
} else None)
.orElse(
// Remove a container and create a new one for the given job
ContainerPool
.remove(freePool, r.action.limits.memory.megabytes.MB)
.map(removeContainer)
// If the list had at least one entry, enough containers were removed to start the new container. After
// removing the containers, we are not interested anymore in the containers that have been removed.
.headOption
.map { _ =>
.map(_ =>
takePrewarmContainer(r.action)
.map(container => {
(container, "recreated")
})
.getOrElse {
(createContainer(r.action.limits.memory.megabytes.MB), "recreated")
}
}
}
.map(container => (container, "recreated prewarm"))
.getOrElse(createContainer(r.action.limits.memory.megabytes.MB), "recreated")))
} else None

createdContainer match {
case Some(((actor, data), containerState)) =>
busyPool = busyPool + (actor -> data)
freePool = freePool - actor
// Remove the action that get's executed now from the buffer and execute the next one afterwards.
runBuffer = runBuffer.dequeueOption.map(_._2).getOrElse(runBuffer)
runBuffer.dequeueOption.foreach { case (run, _) => self ! run }
if (isResentFromBuffer) {
// It is guaranteed that the currently executed messages is the head of the queue, if the message comes from the buffer
runBuffer = runBuffer.dequeue._2
runBuffer.dequeueOption.foreach { case (run, _) => self ! run }
}
actor ! r // forwards the run request to the container
logContainerStart(r, containerState)
case None =>
Expand All @@ -167,7 +159,7 @@ class ContainerPool(childFactory: ActorRefFactory => ActorRef,
} else {
r.retryLogDeadline
}
if (!runBuffer.exists(_.msg == r.msg)) {
if (!isResentFromBuffer) {
// Add this request to the buffer, as it is not there yet.
runBuffer = runBuffer.enqueue(r)
}
Expand Down Expand Up @@ -256,6 +248,10 @@ class ContainerPool(childFactory: ActorRefFactory => ActorRef,
freePool = freePool - toDelete
busyPool = busyPool - toDelete
}

def hasPoolSpaceFor[A](pool: Map[A, ContainerData], memory: ByteSize): Boolean = {
pool.map(_._2.memoryLimit.toMB).sum + memory.toMB <= poolConfig.userMemory.toMB
}
}

object ContainerPool {
Expand Down Expand Up @@ -299,18 +295,16 @@ object ContainerPool {
case (ref, w: WarmedData) => ref -> w
}

if (freeContainers.nonEmpty && freeContainers.map(_._2.memoryLimit.toMB).sum >= memory.toMB) {
if (memory > 0.B) {
val (ref, data) = freeContainers.minBy(_._2.lastUsed)
// Catch exception if remaining memory will be negative
val remainingMemory = Try(memory - data.memoryLimit).getOrElse(0.B)
List(ref) ++ remove(freeContainers.filterKeys(_ != ref), remainingMemory)
} else {
// Enough containers are found to get the memory, that is necessary. -> Abort recursion
List.empty
}
// else case: All containers are in use currently, or there is more memory needed than containers can be removed.
} else List.empty
if (memory > 0.B && freeContainers.nonEmpty && freeContainers.map(_._2.memoryLimit.toMB).sum >= memory.toMB) {
val (ref, data) = freeContainers.minBy(_._2.lastUsed)
// Catch exception if remaining memory will be negative
val remainingMemory = Try(memory - data.memoryLimit).getOrElse(0.B)
List(ref) ++ remove(freeContainers.filterKeys(_ != ref), remainingMemory)
} else {
// If this is the first call: All containers are in use currently, or there is more memory needed than containers can be removed.
// Or, if this is one of the recurstions: Enough containers are found to get the memory, that is necessary. -> Abort recursion
List.empty
}
}

def props(factory: ActorRefFactory => ActorRef,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,7 @@ class ContainerPoolTests
it should "reuse a warm container" in within(timeout) {
val (containers, factory) = testContainers(2)
val feed = TestProbe()
// Actions are created with default memory limit (MemoryLimit.stdMemory). This means 4 actions can be scheduled.
val pool = system.actorOf(ContainerPool.props(factory, ContainerPoolConfig(MemoryLimit.stdMemory * 4), feed.ref))

pool ! runMessage
Expand All @@ -143,6 +144,7 @@ class ContainerPoolTests
it should "reuse a warm container when action is the same even if revision changes" in within(timeout) {
val (containers, factory) = testContainers(2)
val feed = TestProbe()
// Actions are created with default memory limit (MemoryLimit.stdMemory). This means 4 actions can be scheduled.
val pool = system.actorOf(ContainerPool.props(factory, ContainerPoolConfig(MemoryLimit.stdMemory * 4), feed.ref))

pool ! runMessage
Expand All @@ -158,6 +160,7 @@ class ContainerPoolTests
val (containers, factory) = testContainers(2)
val feed = TestProbe()

// Actions are created with default memory limit (MemoryLimit.stdMemory). This means 4 actions can be scheduled.
val pool = system.actorOf(ContainerPool.props(factory, ContainerPoolConfig(MemoryLimit.stdMemory * 4), feed.ref))
pool ! runMessage
containers(0).expectMsg(runMessage)
Expand Down Expand Up @@ -187,19 +190,19 @@ class ContainerPoolTests
val (containers, factory) = testContainers(3)
val feed = TestProbe()

// a pool with slots for 512MB
val pool = system.actorOf(ContainerPool.props(factory, ContainerPoolConfig(512.MB), feed.ref))
pool ! runMessage
// a pool with slots for 2 actiosn with default memory limit.
val pool = system.actorOf(ContainerPool.props(factory, ContainerPoolConfig(MemoryLimit.stdMemory * 2), feed.ref))
pool ! runMessage // 1 * stdMemory taken
containers(0).expectMsg(runMessage)
pool ! runMessageDifferentAction
pool ! runMessageDifferentAction // 2 * stdMemory taken -> full
containers(1).expectMsg(runMessageDifferentAction)

containers(0).send(pool, NeedWork(warmedData()))
containers(0).send(pool, NeedWork(warmedData())) // first action finished -> 1 * stdMemory taken
feed.expectMsg(MessageFeed.Processed)
containers(1).send(pool, NeedWork(warmedData()))
containers(1).send(pool, NeedWork(warmedData())) // second action finished -> 1 * stdMemory taken
feed.expectMsg(MessageFeed.Processed)

pool ! runMessageLarge
pool ! runMessageLarge // need to remove both action to make space for the large action (needs 2 * stdMemory)
containers(0).expectMsg(Remove)
containers(1).expectMsg(Remove)
containers(2).expectMsg(runMessageLarge)
Expand Down Expand Up @@ -267,15 +270,15 @@ class ContainerPoolTests
val pool = system.actorOf(ContainerPool.props(factory, ContainerPoolConfig(MemoryLimit.stdMemory * 2), feed.ref))

// Start first action
pool ! runMessage
pool ! runMessage // 1 * stdMemory taken
containers(0).expectMsg(runMessage)

// Send second action to the pool
pool ! runMessageLarge
pool ! runMessageLarge // message is too large to be processed immediatly.
containers(1).expectNoMessage(100.milliseconds)

// First action is finished
containers(0).send(pool, NeedWork(warmedData()))
containers(0).send(pool, NeedWork(warmedData())) // pool is empty again.
feed.expectMsg(MessageFeed.Processed)

// Second action should run now
Expand Down Expand Up @@ -581,6 +584,12 @@ class ContainerPoolObjectTests extends FlatSpec with Matchers with MockFactory {
ContainerPool.remove(pool, MemoryLimit.stdMemory) shouldBe List.empty
}

it should "not provide a container from pool if there is not enough capacity" in {
val pool = Map('first -> warmedData())

ContainerPool.remove(pool, MemoryLimit.stdMemory * 2) shouldBe List.empty
}

it should "provide a container from pool with one single free container" in {
val data = warmedData()
val pool = Map('warm -> data)
Expand Down

0 comments on commit 2e334a2

Please sign in to comment.