Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[New Scheduler] Add DataManagementService #5063

Merged
merged 9 commits into from
Mar 18, 2021
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
Add DataManagementService
  • Loading branch information
style95 committed Mar 11, 2021
commit 31e25a0b4af09034f8ee70c7c0ef804abc7e0259
Original file line number Diff line number Diff line change
Expand Up @@ -27,15 +27,14 @@ import java.util.concurrent.Executors

import org.apache.openwhisk.core.ConfigKeys
import org.apache.openwhisk.core.etcd.EtcdType._
import org.apache.openwhisk.core.service.Lease
import pureconfig.loadConfigOrThrow
import spray.json.DefaultJsonProtocol

import scala.language.implicitConversions
import scala.annotation.tailrec
import scala.concurrent.{ExecutionContextExecutor, Future, Promise}

case class Lease(id: Long, ttl: Long)

object RichListenableFuture {
implicit def convertToFuture[T](lf: ListenableFuture[T])(implicit ece: ExecutionContextExecutor): Future[T] = {
val p = Promise[T]()
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,313 @@
package org.apache.openwhisk.core.service

import akka.actor.{Actor, ActorRef, ActorRefFactory, ActorSystem, Props}
import akka.util.Timeout
import io.grpc.StatusRuntimeException
import org.apache.openwhisk.common.Logging
import org.apache.openwhisk.core.etcd.{EtcdClient, EtcdFollower, EtcdLeader}
import org.apache.openwhisk.core.service.DataManagementService.retryInterval

import scala.collection.concurrent.TrieMap
import scala.collection.mutable.{Map, Queue}
import scala.concurrent.duration._
import scala.concurrent.{ExecutionContext, Future}
import scala.util.Success

// messages received by the actor
// it is required to specify a recipient directly for the retryable message processing
case class ElectLeader(key: String, value: String, recipient: ActorRef, watchEnabled: Boolean = true)
case class RegisterInitialData(key: String,
value: String,
failoverEnabled: Boolean = true,
recipient: Option[ActorRef] = None)

case class RegisterData(key: String, value: String, failoverEnabled: Boolean = true)
case class DeRegisterData(key: String)
case class UpdateDataOnChange(key: String, value: String)

// messages sent by the actor
case class ElectionResult(leadership: Either[EtcdFollower, EtcdLeader])
case class FinishWork(key: String)
case class InitialDataStorageResults(key: String, result: Either[AlreadyExist, Done])
case class Done()
case class AlreadyExist()

/**
* This service is in charge of storing given data to ETCD.
* In case there is any issue occurred while storing data, it keeps trying until the data is stored.
style95 marked this conversation as resolved.
Show resolved Hide resolved
* So it guarantees the data is eventually stored.
*/
class DataManagementService(watcherService: ActorRef, workerFactory: ActorRefFactory => ActorRef)(
Copy link
Member Author

@style95 style95 Feb 11, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will include test cases into this PR after setting up the CI pipeline for scheduler components.

implicit logging: Logging,
actorSystem: ActorSystem)
extends Actor {
private implicit val ec = context.dispatcher

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This class is used by both schedulers and invokers to store data to ETCD.
The following kinds of data are stored to ETCD.

  1. Throttling data(Action / Namespace)
  2. Queue endpoint(where a queue is running)
  3. Scheduler endpoint.
  4. Container data(running container, warmed container, data to describe how many containers are being created)

Dependent modules are Queue, ContainerProxy, CreationJobManager, etc.

implicit val requestTimeout: Timeout = Timeout(5.seconds)
private[service] val dataCache = TrieMap[String, String]()
private val operations = Map.empty[String, Queue[Any]]
private var inProgressKeys = Set.empty[String]
private val watcherName = "data-management-service"

private val worker = workerFactory(context)

override def receive: Receive = {
case FinishWork(key) =>
// send waiting operation to worker if there is any, else update the inProgressKeys
val ops = operations.get(key)
if (ops.nonEmpty && ops.get.nonEmpty) {
val operation = ops.get.dequeue()
worker ! operation
} else {
inProgressKeys = inProgressKeys - key
operations.remove(key) // remove empty queue from the map to free memories
style95 marked this conversation as resolved.
Show resolved Hide resolved
}

// normally these messages will be sent when queues are created.
case request: ElectLeader =>
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Leader election happens when a queue is created.
This is to guarantee only one scheduler creates a certain queue.
So it happens relatively fewer times.

if (inProgressKeys.contains(request.key)) {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

With the retry nature of this component, if there is a precedent request(being retried), it would store the new request to a buffer.

logging.info(this, s"save request $request into a buffer")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do these really need to be info level?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In this case, it stores the request into a buffer because there is already precedent request processing. If any issue happens it would let us know if the request has processed or not.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So I'm still learning what this is doing, but what does each request involve? Is it every activation or some sort of metadata setup? If it's every activation it would seem spammy to me otherwise I think it's fine

operations.getOrElseUpdate(request.key, Queue.empty[Any]).enqueue(request)
} else {
worker ! request
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actual works would be delegated to ETCDWorker.

inProgressKeys = inProgressKeys + request.key
}

case request: RegisterInitialData =>
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actions under the same namespace share some data such as namespace throttling data.
So it is required to store the data if there is no data yet but not overwrite an existing one.
This case is for the case.

// send WatchEndpoint first as the put operation will be retry until success if failed
if (request.failoverEnabled)
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the failover is enabled, it would watch the key and if the key is deleted for some reason, it would try to restore it.

watcherService ! WatchEndpoint(request.key, request.value, isPrefix = false, watcherName, Set(DeleteEvent))
if (inProgressKeys.contains(request.key)) {
logging.info(this, s"save request $request into a buffer")
operations.getOrElseUpdate(request.key, Queue.empty[Any]).enqueue(request)
} else {
worker ! request
inProgressKeys = inProgressKeys + request.key
}

case request: RegisterData =>
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This will overwrite the existing data in ETCD.
Generally, this is used for data that is not shared among actions.

// send WatchEndpoint first as the put operation will be retry until success if failed
style95 marked this conversation as resolved.
Show resolved Hide resolved
if (request.failoverEnabled)
watcherService ! WatchEndpoint(request.key, request.value, isPrefix = false, watcherName, Set(DeleteEvent))
if (inProgressKeys.contains(request.key)) {
// the new put|delete operation will erase influences made by older operations like put&delete
// so we can remove these old operations
logging.info(this, s"save request $request into a buffer")
val queue = operations.getOrElseUpdate(request.key, Queue.empty[Any]).filter { value =>
value match {
case _: RegisterData | _: WatcherClosed | _: RegisterInitialData => false
case _ => true
}
}
queue.enqueue(request)
operations.update(request.key, queue)
} else {
worker ! request
inProgressKeys = inProgressKeys + request.key
}

case request: WatcherClosed =>
if (inProgressKeys.contains(request.key)) {
// the new put|delete operation will erase influences made by older operations like put&delete
// so we can remove these old operations
logging.info(this, s"save request $request into a buffer")
val queue = operations.getOrElseUpdate(request.key, Queue.empty[Any]).filter { value =>
value match {
case _: RegisterData | _: WatcherClosed | _: RegisterInitialData => false
case _ => true
}
}
queue.enqueue(request)
operations.update(request.key, queue)
} else {
worker ! request
inProgressKeys = inProgressKeys + request.key
}

// It is required to close the watcher first before deleting etcd data
// It is supposed to receive the WatcherClosed message after the watcher is stopped.
case msg: DeRegisterData =>
style95 marked this conversation as resolved.
Show resolved Hide resolved
watcherService ! UnWatchEndpoint(msg.key, isPrefix = false, watcherName, needFeedback = true)

case WatchEndpointRemoved(_, key, value, false) =>
self ! RegisterInitialData(key, value, failoverEnabled = false) // the watcher is already setup

// it is supposed not to receive "prefixed" data
style95 marked this conversation as resolved.
Show resolved Hide resolved
case WatchEndpointRemoved(_, key, value, true) =>
logging.error(this, s"unexpected data received: ${WatchEndpoint(key, value, isPrefix = true, watcherName)}")

case msg: UpdateDataOnChange =>
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To reduce the loads against ETCD, it does not store data if there is no change in the value.

dataCache.get(msg.key) match {
case Some(cached) if cached == msg.value =>
logging.debug(this, s"skip publishing data ${msg.key} because the data is not changed.")
// do nothing
style95 marked this conversation as resolved.
Show resolved Hide resolved

case Some(cached) if cached != msg.value =>
dataCache.update(msg.key, msg.value)
self ! RegisterData(msg.key, msg.value, failoverEnabled = false) // the watcher is already setup

case None =>
dataCache.put(msg.key, msg.value)
self ! RegisterData(msg.key, msg.value)

}
}
}

object DataManagementService {
// Todo: Change to configuration
val retryInterval: FiniteDuration = 1.second
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should change to configuration before merging. Is 1 second a good default for this?


def props(watcherService: ActorRef, workerFactory: ActorRefFactory => ActorRef)(implicit logging: Logging,
actorSystem: ActorSystem): Props = {
Props(new DataManagementService(watcherService, workerFactory))
}
}

class EtcdWorker(etcdClient: EtcdClient, leaseService: ActorRef)(implicit val ec: ExecutionContext,
actorSystem: ActorSystem,
logging: Logging)
extends Actor {

private val parent = context.parent
style95 marked this conversation as resolved.
Show resolved Hide resolved
style95 marked this conversation as resolved.
Show resolved Hide resolved
private var lease: Option[Lease] = None
leaseService ! GetLease

override def receive: Receive = {
case msg: Lease =>
lease = Some(msg)

// leader election + endpoint management
case request: ElectLeader =>
lease match {
case Some(l) =>
etcdClient
.electLeader(request.key, request.value, l)
.andThen {
case Success(msg) =>
request.recipient ! ElectionResult(msg)
parent ! FinishWork(request.key)
}
.recover {
// if there is no lease, reissue it and retry immediately
case t: StatusRuntimeException =>
logging.warn(this, s"a lease is expired while leader election, reissue it: $t")
lease = None
leaseService ! GetLease
sendMessageToSelfAfter(request, retryInterval)

// it should retry forever until the data is stored
case t: Throwable =>
logging.warn(this, s"unexpected error happened: $t, retry storing data")
sendMessageToSelfAfter(request, retryInterval)
}
case None =>
logging.warn(this, s"lease not found, retry storing data")
leaseService ! GetLease
sendMessageToSelfAfter(request, retryInterval)
}

// only endpoint management
case request: RegisterData =>
lease match {
case Some(l) =>
etcdClient
.put(request.key, request.value, l.id)
.andThen {
case Success(_) =>
parent ! FinishWork(request.key)
}
.recover {
// if there is no lease, reissue it and retry immediately
case t: StatusRuntimeException =>
logging.warn(this, s"a lease is expired while registering data ${request.key}, reissue it: $t")
lease = None
leaseService ! GetLease
sendMessageToSelfAfter(request, retryInterval)

// it should retry forever until the data is stored
case t: Throwable =>
logging.warn(this, s"unexpected error happened: $t, retry storing data ${request.key}")
sendMessageToSelfAfter(request, retryInterval)
}
case None =>
logging.warn(this, s"lease not found, retry storing data ${request.key}")
leaseService ! GetLease
sendMessageToSelfAfter(request, retryInterval)
}

// it stores the data iif there is no such one
case request: RegisterInitialData =>
lease match {
case Some(l) =>
etcdClient
.putTxn(request.key, request.value, 0, l.id)
.map { res =>
parent ! FinishWork(request.key)
if (res.getSucceeded) {
logging.debug(this, s"data is stored.")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what data is stored?

request.recipient.map(_ ! InitialDataStorageResults(request.key, Right(Done())))
} else {
logging.debug(this, s"data is already stored for: $request")
request.recipient.map(_ ! InitialDataStorageResults(request.key, Left(AlreadyExist())))
}
}
.recover {
// if there is no lease, reissue it and retry immediately
case t: StatusRuntimeException =>
logging.warn(
this,
s"a lease is expired while registering an initial data ${request.key}, reissue it: $t")
lease = None
leaseService ! GetLease
sendMessageToSelfAfter(request, retryInterval)

// it should retry forever until the data is stored
case t: Throwable =>
logging.warn(this, s"unexpected error happened: $t, retry storing data for ${request.key}")
sendMessageToSelfAfter(request, retryInterval)
}
case None =>
logging.warn(this, s"lease not found, retry storing data for ${request.key}")
leaseService ! GetLease
sendMessageToSelfAfter(request, retryInterval)
}

case msg: WatcherClosed =>
etcdClient
.del(msg.key)
.andThen {
case Success(_) =>
parent ! FinishWork(msg.key)
}
.recover {
// if there is no lease, reissue it and retry immediately
case t: StatusRuntimeException =>
logging.warn(this, s"a lease is expired while deleting data ${msg.key}, reissue it: $t")
lease = None
leaseService ! GetLease
sendMessageToSelfAfter(msg, retryInterval)

// it should retry forever until the data is stored
case t: Throwable =>
logging.warn(this, s"unexpected error happened: $t, retry storing data for ${msg.key}")
sendMessageToSelfAfter(msg, retryInterval)
}

}

private def sendMessageToSelfAfter(msg: Any, retryInterval: FiniteDuration): Future[Unit] = {
style95 marked this conversation as resolved.
Show resolved Hide resolved
akka.pattern.after(retryInterval, actorSystem.scheduler) {
self ! msg
Future.successful({})
}
}
}

object EtcdWorker {
def props(etcdClient: EtcdClient, leaseService: ActorRef)(implicit ec: ExecutionContext,
actorSystem: ActorSystem,
logging: Logging): Props = {
Props(new EtcdWorker(etcdClient, leaseService))
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ case class WatchEndpointInserted(override val watchKey: String,
extends WatchEndpointOperation(watchKey, key, value, isPrefix)
case class WatcherClosed(key: String, isPrefix: Boolean)

// These are abstraction for event from ETCD.
sealed trait EtcdEvent
case object PutEvent extends EtcdEvent
case object DeleteEvent extends EtcdEvent
Expand Down