Skip to content

Commit

Permalink
Initial implementation of final value collecting
Browse files Browse the repository at this point in the history
  • Loading branch information
Joseph Poremba authored and Joseph Poremba committed Nov 23, 2021
1 parent 9a01498 commit 6bc0279
Show file tree
Hide file tree
Showing 4 changed files with 76 additions and 7 deletions.
43 changes: 42 additions & 1 deletion src/main/scala/com/cluster/graph/GlobalCoordinator.scala
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,9 @@ import com.Typedefs.{GCRef, PCRef}

import scala.concurrent.duration.DurationInt
import scala.concurrent.{Await, Future}
import com.cluster.graph.entity.VertexEntity
import com.algorithm.VertexProgram
import akka.stream.javadsl.Partition

/** The central coordinator monitors the state of every vertex in the graph (via the
* partitionCoordinator), to ensure all vertices are performing the computation for the same
Expand Down Expand Up @@ -40,6 +43,9 @@ class GlobalCoordinator(ctx: ActorContext[GlobalCoordinator.Command])
var numPartitions = -1
var numNodes = -1

val finalValues = collection.mutable.Map[Int, VertexEntity.VertexValT]()
var nPCFinalValues = 0

def globallyDone(stepNum: Int): Boolean = {
doneCounter(stepNum) + voteCounter(stepNum) == numPartitions
}
Expand Down Expand Up @@ -73,7 +79,8 @@ class GlobalCoordinator(ctx: ActorContext[GlobalCoordinator.Command])
println(s"gc : step ${stepNum}: done counter${doneCounter(stepNum)}")

if (globallyDone(stepNum)) {
println("globally done")
println("globally done =========================================================================================================================================")
println(s"beginning superstep ${stepNum + 1}")
broadcastBEGINToPCs(stepNum + 1)
}
Behaviors.same
Expand All @@ -84,6 +91,10 @@ class GlobalCoordinator(ctx: ActorContext[GlobalCoordinator.Command])
if (globallyTerminated(stepNum)) {
println("TERMINATION")
//TODO TERMINATE..?
nPCFinalValues = 0
for((pid, pcRef) <- pcRefs) {
pcRef ! PartitionCoordinator.GetFinalValues
}
} else if (globallyDone(stepNum)) {
broadcastBEGINToPCs(stepNum + 1)
}
Expand Down Expand Up @@ -111,6 +122,35 @@ class GlobalCoordinator(ctx: ActorContext[GlobalCoordinator.Command])
replyTo ! GetPCRefsResponse(pcRefs)
Behaviors.same

case FinalValues(valueMap) => {
// println(s"Received map: ${valueMap}")
// println("Foo -1")
// println(s"Final Values Map: ${finalValues}")
// println("Foo 0")
finalValues ++= valueMap
// println("Foo 0.5")
// println(s"Final Values Map: ${finalValues}")
// println("Foo 1")
// println(s"Final Values Map: ${finalValues}")
// println("Foo 2")
nPCFinalValues += 1
// for((key, value) <- valueMap) {
// println(s"Key: ${key}, Value: ${value}")
// finalValues.update(key, value)
// }
// println("Foo 3")

// finalValues ++= valueMap
// println(s"Final Values: ${finalValues}")
if(nPCFinalValues == numPartitions) {
println("Final Values:")
finalValues.toMap.foreach(x => println(s"Vertex: ${x._1}, Value: ${x._2}"))
// for((vtx, value) <- finalValues) {
// println(s"${vtx} -> ${value}")
// }
}
Behaviors.same
}
}
}
}
Expand All @@ -135,6 +175,7 @@ object GlobalCoordinator {
final case class DONE(stepNum: Int) extends Command
final case class TerminationVote(stepNum: Int) extends Command
final case class BEGIN() extends Command
final case class FinalValues(valueMap: collection.immutable.Map[Int, VertexEntity.VertexValT]) extends Command

// Init Sync Commands
final case class GetPCRefs(replyTo: ActorRef[GetPCRefsResponse]) extends Command
Expand Down
22 changes: 22 additions & 0 deletions src/main/scala/com/cluster/graph/PartitionCoordinator.scala
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import com.cluster.graph.entity.{EntityId, MainEntity, VertexEntity}
import scala.collection.mutable.ArrayBuffer
import scala.concurrent.duration.DurationInt
import scala.concurrent.{Await, Future}
import com.algorithm.VertexProgram

/** PartitionCoordinator Actor Exists on each partition of the graph, is aware of all the main
* vertices on its partition When a main vertex completes its computation for the current
Expand Down Expand Up @@ -52,6 +53,8 @@ class PartitionCoordinator(
// counts the number of vertices in this partition that have voted to terminate their computation
var voteCounter = collection.mutable.Map[Int, Int]().withDefaultValue(0)

val finalValues = collection.mutable.Map[Int, VertexEntity.VertexValT]()

def locallyDone(stepNum: Int): Boolean = {
doneCounter(stepNum) + voteCounter(stepNum) == nMains
}
Expand Down Expand Up @@ -134,6 +137,23 @@ class PartitionCoordinator(
case AdaptedResponse(message) =>
ctx.log.info("Got response from hal: {}", message)
Behaviors.same

case FinalValue(vtx, value) => {
println(s"Received value: ${vtx} -> ${value}")
finalValues += (vtx -> value)
if(finalValues.size == nMains) {
gcRef ! GlobalCoordinator.FinalValues(finalValues.toMap)
}
Behaviors.same
}

case GetFinalValues => {
for (m <- mains) {
val eRef = sharding.entityRefFor(VertexEntity.TypeKey, m.toString)
eRef ! VertexEntity.GetFinalValue
}
Behaviors.same
}
}
}
}
Expand Down Expand Up @@ -179,6 +199,8 @@ object PartitionCoordinator {
final case class DONE(stepNum: Int) extends Command
final case class TerminationVote(stepNum: Int) extends Command
final case class BEGIN(stepNum: Int) extends Command
final case object GetFinalValues extends Command
final case class FinalValue(vtx: Int, value: VertexEntity.VertexValT) extends Command

private case class AdaptedResponse(message: String) extends Command
case object Idle extends Command
Expand Down
5 changes: 5 additions & 0 deletions src/main/scala/com/cluster/graph/entity/MainEntity.scala
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,11 @@ class MainEntity(
Behaviors.same
}

case GetFinalValue => {
pcRef ! PartitionCoordinator.FinalValue(vertexId, currentValue)
Behaviors.same
}

case VertexEntity.Idle =>
entityContext.shard ! ClusterSharding.Passivate(ctx.self)
Behaviors.same
Expand Down
13 changes: 7 additions & 6 deletions src/main/scala/com/cluster/graph/entity/VertexEntity.scala
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,7 @@ import akka.actor.typed.{ActorRef, Behavior}
import akka.actor.typed.scaladsl.Behaviors
import akka.cluster.sharding.typed.scaladsl.{ClusterSharding, EntityContext, EntityTypeKey}
import com.CborSerializable
import com.algorithm.LocalMaximaColouring
import com.algorithm.Colour
import com.algorithm._
import com.cluster.graph.PartitionCoordinator

object VertexEntity {
Expand All @@ -20,6 +19,8 @@ object VertexEntity {
type VertexValT = Option[Colour]
type SuperStep = Int

// Commands

sealed trait Command extends CborSerializable
trait Response extends CborSerializable

Expand All @@ -28,11 +29,14 @@ object VertexEntity {

val TypeKey = EntityTypeKey[VertexEntity.Command]("VertexEntity")

// GAS General Commands
// GAS Commands
case class Begin(stepNum: Int) extends Command
case object End extends Command
final case class NeighbourMessage(stepNum: Int, edgeVal: Option[EdgeValT], msg: Option[MessageT])
extends Command
final case class MirrorTotal(stepNum: Int, total: Option[AccumulatorT]) extends Command
final case class ApplyResult(stepNum: Int, oldVal: VertexValT, newVal: Option[VertexValT]) extends Command
final case object GetFinalValue extends Command

// PartitionCoordinator Commands
final case class NotifyLocation(replyTo: ActorRef[LocationResponse]) extends Command
Expand Down Expand Up @@ -64,9 +68,6 @@ object VertexEntity {
// Init Sync Response
final case class InitializeResponse(message: String) extends Response

// GAS
final case class MirrorTotal(stepNum: Int, total: Option[AccumulatorT]) extends Command
final case class ApplyResult(stepNum: Int, oldVal: VertexValT, newVal: Option[VertexValT]) extends Command

// Counter actions TESTING ONLY
case object Increment extends Command
Expand Down

0 comments on commit 6bc0279

Please sign in to comment.