Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

GAS communication logic in Akka VertexEntities #10

Merged
merged 4 commits into from
Nov 15, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions src/main/scala/com/cluster/graph/ClusterShardingApp.scala
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import com.cluster.graph.entity.{EntityId, VertexEntityType}
import com.typesafe.config.{Config, ConfigFactory}

import scala.collection.mutable.ArrayBuffer
import com.cluster.graph.entity.{EntityId, VertexEntityType}

object ClusterShardingApp {

Expand Down
8 changes: 5 additions & 3 deletions src/main/scala/com/cluster/graph/EntityManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ import akka.cluster.sharding.typed.scaladsl.{ClusterSharding, Entity, EntityRef}
import akka.cluster.typed.Cluster
import akka.util.Timeout
import com.CborSerializable
import com.cluster.graph.GlobalCoordinator.GlobalCoordinatorKey
import com.cluster.graph.Init.{blockInitMain, blockInitMirror, blockInitPartitionCoordinator}
import com.cluster.graph.entity._
import com.preprocessing.partitioning.oneDim.Main
Expand Down Expand Up @@ -154,17 +153,20 @@ class EntityManager(
sharding.entityRefFor(VertexEntity.TypeKey, eid.toString)
// initialize all mirrors of main // TODO Review main check is needed anymore
if (isMain(eid)) {
val mainERef: EntityRef[MainEntity.Initialize] =
val mainERef: EntityRef[VertexEntity.Initialize] =
sharding.entityRefFor(VertexEntity.TypeKey, eid.toString)
val mirrors = mainArray(eid.vertexId).mirrors.map(m =>
new EntityId(VertexEntityType.Mirror.toString(), m.id, m.partition.id)
)
// TODO Pass partitionInDegree to all vertices being created
totalMainsInitialized =
blockInitMain(mainERef, eid, neighbors, mirrors, totalMainsInitialized)
for (m <- mirrors) {
val mirrorERef: EntityRef[VertexEntity.Command] =
sharding.entityRefFor(VertexEntity.TypeKey, m.toString)
totalMirrorsInitialized = blockInitMirror(mirrorERef, m, eid, totalMirrorsInitialized)
// TODO Need to add neighbours
val neighbors = ArrayBuffer[EntityId]()
totalMirrorsInitialized = blockInitMirror(mirrorERef, m, eid, neighbors, totalMirrorsInitialized)
}
}
}
Expand Down
21 changes: 12 additions & 9 deletions src/main/scala/com/cluster/graph/Init.scala
Original file line number Diff line number Diff line change
Expand Up @@ -290,15 +290,15 @@ object Init {
* @param mirrors
*/
def blockInitMain(
mainERef: EntityRef[MainEntity.Initialize],
mainERef: EntityRef[VertexEntity.Initialize],
eid: EntityId,
neighbors: ArrayBuffer[EntityId],
mirrors: ArrayBuffer[EntityId],
totalMainsInitialized: Int
): Int = {
// async call to initialize main
val future: Future[MainEntity.InitializeResponse] = mainERef.ask(ref =>
MainEntity.Initialize(
val future: Future[VertexEntity.InitializeResponse] = mainERef.ask(ref =>
VertexEntity.Initialize(
eid.vertexId,
eid.partitionId,
neighbors,
Expand All @@ -309,7 +309,7 @@ object Init {
// blocking to wait until main vertex is initialized
val mainInitResult = Await.result(future, waitTime)
mainInitResult match {
case MainEntity.InitializeResponse(_) =>
case VertexEntity.InitializeResponse(_) =>
totalMainsInitialized + 1
case _ =>
println(s"Failed to Initialize Main ${eid.vertexId}_${eid.partitionId}")
Expand All @@ -321,27 +321,30 @@ object Init {
* initialized
*
* @param mirrorERef
* @param m
* @param eid
* @param m Entity id of the mirror vertex
* @param eid Entity id of its main vertex
* @param neighbors List of neighbours
*/
def blockInitMirror(
mirrorERef: EntityRef[VertexEntity.Command],
m: EntityId,
eid: EntityId,
neighbors: ArrayBuffer[EntityId],
totalMirrorsInitialized: Int
): Int = {
val future: Future[MirrorEntity.InitializeResponse] = mirrorERef.ask(ref =>
MirrorEntity.InitializeMirror(
val future: Future[VertexEntity.InitializeResponse] = mirrorERef.ask(ref =>
VertexEntity.InitializeMirror(
m.vertexId,
m.partitionId,
eid,
neighbors,
ref
)
)
// blocking to wait until mirror vertex is initialized
val mirrorInitResult = Await.result(future, waitTime)
mirrorInitResult match {
case MirrorEntity.InitializeResponse(_) =>
case VertexEntity.InitializeResponse(_) =>
totalMirrorsInitialized + 1
case _ =>
println(s"Failed to Initialize Main ${eid.vertexId}_${eid.partitionId}")
Expand Down
6 changes: 3 additions & 3 deletions src/main/scala/com/cluster/graph/PartitionCoordinator.scala
Original file line number Diff line number Diff line change
Expand Up @@ -63,11 +63,11 @@ class PartitionCoordinator(
def blockBroadcastLocation(
mainERef: EntityRef[VertexEntity.Command]
): Unit = {
val future: Future[MainEntity.AckPCLocation] =
mainERef.ask(ref => MainEntity.StorePCRef(pcRef, ref))
val future: Future[VertexEntity.AckPCLocation] =
mainERef.ask(ref => VertexEntity.StorePCRef(pcRef, ref))
val broadcastResult = Await.result(future, waitTime)
broadcastResult match {
case MainEntity.AckPCLocation() =>
case VertexEntity.AckPCLocation() =>
nMainsAckd += 1
case _ =>
println(s"${mainERef} failed to acknowledge ${pcRef}'s location'")
Expand Down
30 changes: 27 additions & 3 deletions src/main/scala/com/cluster/graph/entity/EntityId.scala
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,38 @@ final class EntityId(val eType: String, val vertexId: Int, val partitionId: Int)
// A way of resolving the entity TypeKey
def getTypeKey(): EntityTypeKey[VertexEntity.Command] = {
VertexEntityType.withName(eType) match {
case VertexEntityType.Main => MainEntity.TypeKey
case VertexEntityType.Main => MainEntity.TypeKey
case VertexEntityType.Mirror => MirrorEntity.TypeKey
// case _ =>
// Logging.Error(this.toString(), Logging.classFor(Logging.ErrorLevel),s"Unknown entity type: ${eType}")
// null
}
}


// A way of resolving the entity class
// def getEntityClass(): Class[_ <: VertexEntity] = {
// VertexEntityType.withName(eType) match {
// case VertexEntityType.Main => classOf[MainEntity]
// case VertexEntityType.Mirror => classOf[MirrorEntity]
// // case _ =>
// // Logging.Error(this.toString(), Logging.classFor(Logging.ErrorLevel),s"Unknown entity type: ${eType}")
// // null
// }
// }
}
object EntityId {
def getTypeFromString(entityId: String): String = {
entityId.split("_").head
}
}

// A way of resolving the entity class based on string entity id
// def getEntityClassFromString(entityId: String): Class[_ <: VertexEntity] = {
// VertexEntityType.withName(getTypeFromString(entityId)) match {
// case VertexEntityType.Main => classOf[MainEntity]
// case VertexEntityType.Mirror => classOf[MirrorEntity]
// // case _ =>
// // Logging.Error(this.toString(), Logging.classFor(Logging.ErrorLevel),s"Unknown entity type: ${eType}")
// // null
// }
// }
}
152 changes: 92 additions & 60 deletions src/main/scala/com/cluster/graph/entity/MainEntity.scala
Original file line number Diff line number Diff line change
@@ -1,14 +1,15 @@
package com.cluster.graph.entity

import akka.actor.typed.scaladsl.{AbstractBehavior, ActorContext, Behaviors}
import akka.actor.typed.{ActorRef, Behavior}
import scala.collection.mutable
import scala.concurrent.duration._
import scala.collection.mutable.ArrayBuffer
import scala.collection.mutable
import akka.actor.typed.{Behavior, ActorRef}
import akka.actor.typed.scaladsl.{Behaviors, AbstractBehavior, ActorContext}
import akka.cluster.sharding.typed.scaladsl.{ClusterSharding, EntityContext, EntityTypeKey}
import akka.cluster.typed.Cluster
import com.CborSerializable
import com.cluster.graph.PartitionCoordinator

import scala.collection.mutable.ArrayBuffer
import scala.concurrent.duration._
import com.cluster.graph.PartitionCoordinator
import VertexEntity._

// Vertex actor
class MainEntity(
Expand All @@ -18,24 +19,33 @@ class MainEntity(
) extends AbstractBehavior[VertexEntity.Command](ctx)
with VertexEntity {

import MainEntity._

// In order for vertices to be able to send messages, they need to sharding.entityRefFor by entity id
val cluster = Cluster(ctx.system)
val sharding = ClusterSharding(ctx.system)
var vertexId = 0
var partitionId = 0
var value = 0 // Counter TEST ONLY
// TODO neighbourCounter, summedTotal
private var neighbors: ArrayBuffer[EntityId] = null
private var mirrors: ArrayBuffer[EntityId] = null
private var pcRef: ActorRef[PartitionCoordinator.Command] = null

val mirrorCounter: mutable.Map[SuperStep, Int] = new mutable.HashMap()
var active: Boolean = vertexProgram.defaultActivationStatus
var currentValue: VertexValT = vertexProgram.defaultVertexValue
val okToProceed: mutable.Map[SuperStep, Boolean] = new mutable.HashMap()

var value = 0 // Counter TEST ONLY

// In order for vertices find refs for messages, they need to sharding.entityRefFor by entity id
val sharding = ClusterSharding(ctx.system)

override def ctxLog(event: String): Unit = {
ctx.log.info(
s"******************{} ${event} at {}, eid: {}",
ctx.self.path,
nodeAddress,
entityContext.entityId
)
}

override def onMessage(
msg: VertexEntity.Command
): Behavior[VertexEntity.Command] = {
msg match {
case Initialize(vid, pid, neigh, mrs, replyTo) =>
case VertexEntity.Initialize(vid, pid, neigh, mrs, replyTo) =>
vertexId = vid
partitionId = pid.toShort
neighbors = neigh
Expand All @@ -53,22 +63,43 @@ class MainEntity(
Behaviors.same

// GAS Actions
case VertexEntity.Begin =>
ctxLog("Beginning compute")
case VertexEntity.Begin(0) => {
ctxLog("Beginning compute: Step 0")
applyAndScatter(0, None)
Behaviors.same
}

case VertexEntity.Begin(stepNum) => {
ctxLog(s"Beginning compute: Step ${stepNum}")
okToProceed(stepNum) = true
applyIfReady(stepNum)
value += 1
Behaviors.same
}

case VertexEntity.End =>
ctxLog("Ordered to stop " + msg)
// TODO Implement
// TODO Needed?
Behaviors.same
case VertexEntity.NeighbourMessage(stepNum, msg) =>
ctxLog("Received neighbour msg " + msg)
// TODO Implement
Behaviors.same
case MirrorTotal(stepNum, total) =>
ctxLog("Received mirror total " + total)
// TODO Implement

case c: VertexEntity.NeighbourMessage => reactToNeighbourMessage(c)

case MirrorTotal(stepNum, mirrorTotal) => {
ctxLog("Received mirror total " + mirrorTotal)
mirrorTotal match {
case None => ()
case Some(mirrorTotal) => {
val newTotal = summedTotal.get(stepNum) match {
case None => mirrorTotal
case Some(existingTotal) => vertexProgram.sum(existingTotal, mirrorTotal)
}
summedTotal.update(stepNum, newTotal)
}
}
mirrorCounter.update(stepNum, mirrorCounter.getOrElse(stepNum, 0) + 1)
applyIfReady(stepNum)
Behaviors.same
}

case VertexEntity.Idle =>
entityContext.shard ! ClusterSharding.Passivate(ctx.self)
Expand All @@ -95,13 +126,39 @@ class MainEntity(
}
}

def ctxLog(event: String) {
ctx.log.info(
s"******************{} ${event} at {}, eid: {}",
ctx.self.path,
nodeAddress,
entityContext.entityId
)
// Perform apply and scatter phases
def applyAndScatter(stepNum: SuperStep, total: Option[AccumulatorT]): Unit = {
(active, total) match {
case (false, None) => {
// Vote to terminate
pcRef ! PartitionCoordinator.TerminationVote(stepNum) // TODO change to new PC command
}
case _ => {
// Continue
val newVal = vertexProgram.apply(stepNum, vertexId, currentValue, total)
val oldVal = currentValue
currentValue = newVal
val cmd = ApplyResult(stepNum, oldVal, newVal)
for (mirror <- mirrors) {
val mirrorRef = sharding.entityRefFor(mirror.getTypeKey(), mirror.toString())
mirrorRef ! cmd
}
active = !vertexProgram.voteToHalt(oldVal, newVal)
localScatter(stepNum, oldVal, newVal, sharding)
pcRef ! PartitionCoordinator.DONE(stepNum) // TODO change to new PC command
}
}

}

override def applyIfReady(stepNum: SuperStep): Unit = {
if (
okToProceed(stepNum) &&
mirrorCounter(stepNum) == mirrors.length &&
neighbourCounter(stepNum) == partitionInDegree
) {
applyAndScatter(stepNum, summedTotal.get(stepNum))
}
}
}

Expand All @@ -118,29 +175,4 @@ object MainEntity {
new MainEntity(ctx, nodeAddress, entityContext)
})
}

// Orchestration
sealed trait Response extends CborSerializable

// Init Sync Command
final case class Initialize(
vertexId: Int,
partitionId: Int,
neighbors: ArrayBuffer[EntityId],
mirrors: ArrayBuffer[EntityId],
replyTo: ActorRef[InitializeResponse]
) extends VertexEntity.Command

final case class StorePCRef(
pcRef: ActorRef[PartitionCoordinator.Command],
replyTo: ActorRef[AckPCLocation]
) extends VertexEntity.Command

// Init Sync Response
final case class InitializeResponse(message: String) extends Response

final case class AckPCLocation() extends Response

// GAS
final case class MirrorTotal(stepNum: Int, total: Int) extends VertexEntity.Command
}
Loading