Skip to content

Commit

Permalink
Merge pull request #15 from atrostan/finalVals
Browse files Browse the repository at this point in the history
Final vals
  • Loading branch information
jporemba committed Nov 26, 2021
2 parents 9a01498 + 9a48be5 commit e84d506
Show file tree
Hide file tree
Showing 5 changed files with 115 additions and 7 deletions.
34 changes: 34 additions & 0 deletions src/main/scala/com/cluster/graph/ClusterShardingApp.scala
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,17 @@ import com.cluster.graph.Init._
import com.cluster.graph.PartitionCoordinator.BroadcastLocation
import com.cluster.graph.entity.{EntityId, VertexEntityType}
import com.typesafe.config.{Config, ConfigFactory}
import akka.actor.typed.scaladsl.AskPattern.Askable

import scala.collection.mutable.ArrayBuffer
import com.cluster.graph.entity.{EntityId, VertexEntityType}
import com.preprocessing.partitioning.oneDim.{Main, Mirror}
import com.algorithm.Colour
import scala.concurrent.{Await, Future}
import akka.util.Timeout
import scala.concurrent.duration._
import com.cluster.graph.GlobalCoordinator.FinalValuesResponseComplete
import com.cluster.graph.GlobalCoordinator.FinalValuesResponseNotFinished

object ClusterShardingApp {

Expand Down Expand Up @@ -168,6 +175,33 @@ object ClusterShardingApp {
assert(totalMainsAckd == nMains)
gcRef ! GlobalCoordinator.BEGIN()
// TODO at beginning send, BEGIN(0)

// Wait until finished


var finalVals: Map[Int, Option[Colour]] = null
while(null == finalVals){
Thread.sleep(1000)

val timeout: Timeout = 5.seconds
val sched = entityManager.scheduler
val future: Future[GlobalCoordinator.FinalValuesResponse] = gcRef.ask(ref => GlobalCoordinator.GetFinalValues(ref))(timeout,sched)
Await.result(future, Duration.Inf) match {
case FinalValuesResponseComplete(valueMap) => {
finalVals = valueMap
}
case FinalValuesResponseNotFinished => ()
}

}

println("Final Values from main app:")
for((key, value) <- finalVals){
println(s"$key -> $value")
}

// TODO shut down actor system

// increment mains and their mirrors
// for (main <- png.mainArray)
// entityManager ! EntityManager.AddOne(
Expand Down
48 changes: 47 additions & 1 deletion src/main/scala/com/cluster/graph/GlobalCoordinator.scala
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,11 @@ import com.Typedefs.{GCRef, PCRef}

import scala.concurrent.duration.DurationInt
import scala.concurrent.{Await, Future}
import com.cluster.graph.entity.VertexEntity
import com.algorithm.VertexProgram
import akka.stream.javadsl.Partition
import com.algorithm.Colour
import com.fasterxml.jackson.databind.annotation.JsonDeserialize

/** The central coordinator monitors the state of every vertex in the graph (via the
* partitionCoordinator), to ensure all vertices are performing the computation for the same
Expand Down Expand Up @@ -40,6 +45,9 @@ class GlobalCoordinator(ctx: ActorContext[GlobalCoordinator.Command])
var numPartitions = -1
var numNodes = -1

val finalValues = collection.mutable.Map[Int, VertexEntity.VertexValT]()
var nPCFinalValues = 0

def globallyDone(stepNum: Int): Boolean = {
doneCounter(stepNum) + voteCounter(stepNum) == numPartitions
}
Expand Down Expand Up @@ -73,7 +81,8 @@ class GlobalCoordinator(ctx: ActorContext[GlobalCoordinator.Command])
println(s"gc : step ${stepNum}: done counter${doneCounter(stepNum)}")

if (globallyDone(stepNum)) {
println("globally done")
println("globally done =========================================================================================================================================")
println(s"beginning superstep ${stepNum + 1}")
broadcastBEGINToPCs(stepNum + 1)
}
Behaviors.same
Expand All @@ -84,6 +93,10 @@ class GlobalCoordinator(ctx: ActorContext[GlobalCoordinator.Command])
if (globallyTerminated(stepNum)) {
println("TERMINATION")
//TODO TERMINATE..?
nPCFinalValues = 0
for((pid, pcRef) <- pcRefs) {
pcRef ! PartitionCoordinator.GetFinalValues
}
} else if (globallyDone(stepNum)) {
broadcastBEGINToPCs(stepNum + 1)
}
Expand Down Expand Up @@ -111,6 +124,30 @@ class GlobalCoordinator(ctx: ActorContext[GlobalCoordinator.Command])
replyTo ! GetPCRefsResponse(pcRefs)
Behaviors.same

case FinalValues(valueMap) => {
finalValues ++= valueMap

nPCFinalValues += 1

// if(nPCFinalValues == numPartitions) {
// println("Final Values:")

// for((vtx, value) <- finalValues) {
// println(s"${vtx} -> ${value}")
// }
// }

Behaviors.same
}

case GetFinalValues(replyTo) => {
if(nPCFinalValues == numPartitions) {
replyTo ! FinalValuesResponseComplete(finalValues.toMap)
} else {
replyTo ! FinalValuesResponseNotFinished
}
Behaviors.same
}
}
}
}
Expand All @@ -135,6 +172,11 @@ object GlobalCoordinator {
final case class DONE(stepNum: Int) extends Command
final case class TerminationVote(stepNum: Int) extends Command
final case class BEGIN() extends Command
/**
* JsonDeserialize tag needed. See open issue: https://github.com/akka/akka/issues/28566
*/
final case class FinalValues(@JsonDeserialize(keyAs = classOf[Int]) valueMap: collection.immutable.Map[Int, VertexEntity.VertexValT]) extends Command
final case class GetFinalValues(replyTo: ActorRef[FinalValuesResponse]) extends Command

// Init Sync Commands
final case class GetPCRefs(replyTo: ActorRef[GetPCRefsResponse]) extends Command
Expand All @@ -150,4 +192,8 @@ object GlobalCoordinator {
final case class GetPCRefsResponse(pcRefs: collection.mutable.Map[Int, PCRef]) extends Response
final case class BroadcastRefResponse(message: String) extends Response
final case class InitializeResponse(message: String) extends Response

sealed trait FinalValuesResponse extends Response
final case class FinalValuesResponseComplete(@JsonDeserialize(keyAs = classOf[Int]) valueMap: collection.immutable.Map[Int, VertexEntity.VertexValT]) extends FinalValuesResponse
final case object FinalValuesResponseNotFinished extends FinalValuesResponse
}
22 changes: 22 additions & 0 deletions src/main/scala/com/cluster/graph/PartitionCoordinator.scala
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import com.cluster.graph.entity.{EntityId, MainEntity, VertexEntity}
import scala.collection.mutable.ArrayBuffer
import scala.concurrent.duration.DurationInt
import scala.concurrent.{Await, Future}
import com.algorithm.VertexProgram

/** PartitionCoordinator Actor Exists on each partition of the graph, is aware of all the main
* vertices on its partition When a main vertex completes its computation for the current
Expand Down Expand Up @@ -52,6 +53,8 @@ class PartitionCoordinator(
// counts the number of vertices in this partition that have voted to terminate their computation
var voteCounter = collection.mutable.Map[Int, Int]().withDefaultValue(0)

val finalValues = collection.mutable.Map[Int, VertexEntity.VertexValT]()

def locallyDone(stepNum: Int): Boolean = {
doneCounter(stepNum) + voteCounter(stepNum) == nMains
}
Expand Down Expand Up @@ -134,6 +137,23 @@ class PartitionCoordinator(
case AdaptedResponse(message) =>
ctx.log.info("Got response from hal: {}", message)
Behaviors.same

case FinalValue(vtx, value) => {
println(s"Received value: ${vtx} -> ${value}")
finalValues += (vtx -> value)
if(finalValues.size == nMains) {
gcRef ! GlobalCoordinator.FinalValues(finalValues.toMap)
}
Behaviors.same
}

case GetFinalValues => {
for (m <- mains) {
val eRef = sharding.entityRefFor(VertexEntity.TypeKey, m.toString)
eRef ! VertexEntity.GetFinalValue
}
Behaviors.same
}
}
}
}
Expand Down Expand Up @@ -179,6 +199,8 @@ object PartitionCoordinator {
final case class DONE(stepNum: Int) extends Command
final case class TerminationVote(stepNum: Int) extends Command
final case class BEGIN(stepNum: Int) extends Command
final case object GetFinalValues extends Command
final case class FinalValue(vtx: Int, value: VertexEntity.VertexValT) extends Command

private case class AdaptedResponse(message: String) extends Command
case object Idle extends Command
Expand Down
5 changes: 5 additions & 0 deletions src/main/scala/com/cluster/graph/entity/MainEntity.scala
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,11 @@ class MainEntity(
Behaviors.same
}

case GetFinalValue => {
pcRef ! PartitionCoordinator.FinalValue(vertexId, currentValue)
Behaviors.same
}

case VertexEntity.Idle =>
entityContext.shard ! ClusterSharding.Passivate(ctx.self)
Behaviors.same
Expand Down
13 changes: 7 additions & 6 deletions src/main/scala/com/cluster/graph/entity/VertexEntity.scala
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,7 @@ import akka.actor.typed.{ActorRef, Behavior}
import akka.actor.typed.scaladsl.Behaviors
import akka.cluster.sharding.typed.scaladsl.{ClusterSharding, EntityContext, EntityTypeKey}
import com.CborSerializable
import com.algorithm.LocalMaximaColouring
import com.algorithm.Colour
import com.algorithm._
import com.cluster.graph.PartitionCoordinator

object VertexEntity {
Expand All @@ -20,6 +19,8 @@ object VertexEntity {
type VertexValT = Option[Colour]
type SuperStep = Int

// Commands

sealed trait Command extends CborSerializable
trait Response extends CborSerializable

Expand All @@ -28,11 +29,14 @@ object VertexEntity {

val TypeKey = EntityTypeKey[VertexEntity.Command]("VertexEntity")

// GAS General Commands
// GAS Commands
case class Begin(stepNum: Int) extends Command
case object End extends Command
final case class NeighbourMessage(stepNum: Int, edgeVal: Option[EdgeValT], msg: Option[MessageT])
extends Command
final case class MirrorTotal(stepNum: Int, total: Option[AccumulatorT]) extends Command
final case class ApplyResult(stepNum: Int, oldVal: VertexValT, newVal: Option[VertexValT]) extends Command
final case object GetFinalValue extends Command

// PartitionCoordinator Commands
final case class NotifyLocation(replyTo: ActorRef[LocationResponse]) extends Command
Expand Down Expand Up @@ -64,9 +68,6 @@ object VertexEntity {
// Init Sync Response
final case class InitializeResponse(message: String) extends Response

// GAS
final case class MirrorTotal(stepNum: Int, total: Option[AccumulatorT]) extends Command
final case class ApplyResult(stepNum: Int, oldVal: VertexValT, newVal: Option[VertexValT]) extends Command

// Counter actions TESTING ONLY
case object Increment extends Command
Expand Down

0 comments on commit e84d506

Please sign in to comment.