Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[New Scheduler] Add DataManagementService #5063

Merged
merged 9 commits into from
Mar 18, 2021
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Apply comments
  • Loading branch information
style95 committed Mar 11, 2021
commit 75e200aecf036706b836333ff36ecdc1e8c322c9
5 changes: 5 additions & 0 deletions ansible/group_vars/all
Original file line number Diff line number Diff line change
Expand Up @@ -450,3 +450,8 @@ etcd_connect_string: "{% set ret = [] %}\
{{ ret.append( hostvars[host].ansible_host + ':' + ((etcd.client.port+loop.index-1)|string) ) }}\
{% endfor %}\
{{ ret | join(',') }}"

scheduler:
dataManagementService:
retryInterval: "{{ scheduler_dataManagementService_retryInterval | default(1 second) }}"

Original file line number Diff line number Diff line change
Expand Up @@ -294,4 +294,6 @@ object ConfigKeys {
val schedulerMaxPeek = "whisk.scheduler.max-peek"

val whiskClusterName = "whisk.cluster.name"

val dataManagementServiceRetryInterval = "whisk.scheduler.data-management-service.retryInterval"
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,15 @@ import akka.actor.{Actor, ActorRef, ActorRefFactory, ActorSystem, Props}
import akka.util.Timeout
import io.grpc.StatusRuntimeException
import org.apache.openwhisk.common.Logging
import org.apache.openwhisk.core.ConfigKeys
import org.apache.openwhisk.core.etcd.{EtcdClient, EtcdFollower, EtcdLeader}
import org.apache.openwhisk.core.service.DataManagementService.retryInterval
import pureconfig.loadConfigOrThrow

import scala.collection.concurrent.TrieMap
import scala.collection.mutable.{Map, Queue}
import scala.concurrent.ExecutionContext
import scala.concurrent.duration._
import scala.concurrent.{ExecutionContext, Future}
import scala.util.Success

// messages received by the actor
Expand All @@ -22,7 +24,7 @@ case class RegisterInitialData(key: String,
recipient: Option[ActorRef] = None)

case class RegisterData(key: String, value: String, failoverEnabled: Boolean = true)
case class DeRegisterData(key: String)
case class UnregisterData(key: String)
case class UpdateDataOnChange(key: String, value: String)

// messages sent by the actor
Expand Down Expand Up @@ -66,15 +68,15 @@ class DataManagementService(watcherService: ActorRef, workerFactory: ActorRefFac
// normally these messages will be sent when queues are created.
case request: ElectLeader =>
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Leader election happens when a queue is created.
This is to guarantee only one scheduler creates a certain queue.
So it happens relatively fewer times.

if (inProgressKeys.contains(request.key)) {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

With the retry nature of this component, if there is a precedent request(being retried), it would store the new request to a buffer.

logging.info(this, s"save request $request into a buffer")
logging.info(this, s"save a request $request into a buffer")
operations.getOrElseUpdate(request.key, Queue.empty[Any]).enqueue(request)
} else {
worker ! request
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actual works would be delegated to ETCDWorker.

inProgressKeys = inProgressKeys + request.key
}

case request: RegisterInitialData =>
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actions under the same namespace share some data such as namespace throttling data.
So it is required to store the data if there is no data yet but not overwrite an existing one.
This case is for the case.

// send WatchEndpoint first as the put operation will be retry until success if failed
// send WatchEndpoint first as the put operation will be retried until success if failed
if (request.failoverEnabled)
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the failover is enabled, it would watch the key and if the key is deleted for some reason, it would try to restore it.

watcherService ! WatchEndpoint(request.key, request.value, isPrefix = false, watcherName, Set(DeleteEvent))
if (inProgressKeys.contains(request.key)) {
Expand Down Expand Up @@ -108,8 +110,9 @@ class DataManagementService(watcherService: ActorRef, workerFactory: ActorRefFac

case request: WatcherClosed =>
if (inProgressKeys.contains(request.key)) {
// the new put|delete operation will erase influences made by older operations like put&delete
// so we can remove these old operations
// The put|delete operations against the same key will overwrite the previous results.
// For example, if we put a value, delete it and put a new value again, the final result will be the new value.
// So we can remove these old operations
logging.info(this, s"save request $request into a buffer")
val queue = operations.getOrElseUpdate(request.key, Queue.empty[Any]).filter { value =>
value match {
Expand All @@ -126,8 +129,8 @@ class DataManagementService(watcherService: ActorRef, workerFactory: ActorRefFac

// It is required to close the watcher first before deleting etcd data
// It is supposed to receive the WatcherClosed message after the watcher is stopped.
case msg: DeRegisterData =>
watcherService ! UnWatchEndpoint(msg.key, isPrefix = false, watcherName, needFeedback = true)
case msg: UnregisterData =>
watcherService ! UnwatchEndpoint(msg.key, isPrefix = false, watcherName, needFeedback = true)

case WatchEndpointRemoved(_, key, value, false) =>
self ! RegisterInitialData(key, value, failoverEnabled = false) // the watcher is already setup
Expand All @@ -140,7 +143,6 @@ class DataManagementService(watcherService: ActorRef, workerFactory: ActorRefFac
dataCache.get(msg.key) match {
case Some(cached) if cached == msg.value =>
logging.debug(this, s"skip publishing data ${msg.key} because the data is not changed.")
// do nothing

case Some(cached) if cached != msg.value =>
dataCache.update(msg.key, msg.value)
Expand All @@ -155,21 +157,20 @@ class DataManagementService(watcherService: ActorRef, workerFactory: ActorRefFac
}

object DataManagementService {
// Todo: Change to configuration
val retryInterval: FiniteDuration = 1.second
val retryInterval: FiniteDuration = loadConfigOrThrow[FiniteDuration](ConfigKeys.dataManagementServiceRetryInterval)

def props(watcherService: ActorRef, workerFactory: ActorRefFactory => ActorRef)(implicit logging: Logging,
actorSystem: ActorSystem): Props = {
Props(new DataManagementService(watcherService, workerFactory))
}
}

class EtcdWorker(etcdClient: EtcdClient, leaseService: ActorRef)(implicit val ec: ExecutionContext,
private[service] class EtcdWorker(etcdClient: EtcdClient, leaseService: ActorRef)(implicit val ec: ExecutionContext,
actorSystem: ActorSystem,
logging: Logging)
extends Actor {

private val parent = context.parent
private val dataManagementService = context.parent
private var lease: Option[Lease] = None
leaseService ! GetLease

Expand All @@ -186,7 +187,7 @@ class EtcdWorker(etcdClient: EtcdClient, leaseService: ActorRef)(implicit val ec
.andThen {
case Success(msg) =>
request.recipient ! ElectionResult(msg)
parent ! FinishWork(request.key)
dataManagementService ! FinishWork(request.key)
}
.recover {
// if there is no lease, reissue it and retry immediately
Expand Down Expand Up @@ -215,7 +216,7 @@ class EtcdWorker(etcdClient: EtcdClient, leaseService: ActorRef)(implicit val ec
.put(request.key, request.value, l.id)
.andThen {
case Success(_) =>
parent ! FinishWork(request.key)
dataManagementService ! FinishWork(request.key)
}
.recover {
// if there is no lease, reissue it and retry immediately
Expand Down Expand Up @@ -243,12 +244,12 @@ class EtcdWorker(etcdClient: EtcdClient, leaseService: ActorRef)(implicit val ec
etcdClient
.putTxn(request.key, request.value, 0, l.id)
.map { res =>
parent ! FinishWork(request.key)
dataManagementService ! FinishWork(request.key)
if (res.getSucceeded) {
logging.debug(this, s"data is stored.")
logging.info(this, s"initial data storing succeeds for ${request.key}")
request.recipient.map(_ ! InitialDataStorageResults(request.key, Right(Done())))
} else {
logging.debug(this, s"data is already stored for: $request")
logging.info(this, s"data is already stored for: $request, cancel the initial data storing")
request.recipient.map(_ ! InitialDataStorageResults(request.key, Left(AlreadyExist())))
}
}
Expand Down Expand Up @@ -278,7 +279,7 @@ class EtcdWorker(etcdClient: EtcdClient, leaseService: ActorRef)(implicit val ec
.del(msg.key)
.andThen {
case Success(_) =>
parent ! FinishWork(msg.key)
dataManagementService ! FinishWork(msg.key)
}
.recover {
// if there is no lease, reissue it and retry immediately
Expand All @@ -296,11 +297,8 @@ class EtcdWorker(etcdClient: EtcdClient, leaseService: ActorRef)(implicit val ec

}

private def sendMessageToSelfAfter(msg: Any, retryInterval: FiniteDuration): Future[Unit] = {
akka.pattern.after(retryInterval, actorSystem.scheduler) {
self ! msg
Future.successful({})
}
private def sendMessageToSelfAfter(msg: Any, retryInterval: FiniteDuration) = {
actorSystem.scheduler.scheduleOnce(retryInterval, self, msg)
}
}

Expand Down