Skip to content

Commit

Permalink
Merge branch 'main' of github.com:atrostan/akka-gps into preproc/hybr…
Browse files Browse the repository at this point in the history
…id-partitioning
  • Loading branch information
atrostan committed Nov 16, 2021
2 parents c6adbe6 + ba4f81c commit ac838d2
Show file tree
Hide file tree
Showing 19 changed files with 586 additions and 212 deletions.
6 changes: 3 additions & 3 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -45,9 +45,9 @@ lazy val `akka-gps` = project
scalaVersion := "2.12.15",
Compile / scalacOptions ++= Seq("-deprecation", "-feature", "-unchecked", "-Xlog-reflective-calls", "-Xlint", "-target:jvm-1.8"),
Compile / javacOptions ++= Seq("-Xlint:unchecked", "-Xlint:deprecation"),
Compile / PB.targets := Seq(
scalapb.gen() -> (Compile / sourceManaged).value / "scalapb"
),
// Compile / PB.targets := Seq(
// scalapb.gen() -> (Compile / sourceManaged).value / "scalapb"
// ),
run / javaOptions ++= Seq("-Xms128m", "-Xmx8G", "-XX:+UseG1GC", "-Djava.library.path=./target/native", "-Dlog4j.configuration=src/main/resources/log4j.properties"),
libraryDependencies ++= Seq(
"com.typesafe.akka" %% "akka-actor-typed" % akkaVersion,
Expand Down
4 changes: 2 additions & 2 deletions project/protoc.sbt
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
addSbtPlugin("com.thesamet" % "sbt-protoc" % "1.0.2")
// addSbtPlugin("com.thesamet" % "sbt-protoc" % "1.0.2")

libraryDependencies += "com.thesamet.scalapb" %% "compilerplugin" % "0.11.3"
// libraryDependencies += "com.thesamet.scalapb" %% "compilerplugin" % "0.11.3"
6 changes: 3 additions & 3 deletions src/main/scala/com/Conversion.scala
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ import akka.actor.typed
import akka.actor.typed.ActorRefResolver
import akka.actor.typed.scaladsl.adapter._
import akka.serialization.Serialization
import scalapb.TypeMapper
// import scalapb.TypeMapper

import java.nio.charset.StandardCharsets.UTF_8

Expand All @@ -18,8 +18,8 @@ object Conversion {
Serialization.getCurrentTransportInformation().system.toTyped
}

implicit def mapper[T]: TypeMapper[String, ActorRef[T]] =
TypeMapper[String, ActorRef[T]](resolver.resolveActorRef)(serialize)
// implicit def mapper[T]: TypeMapper[String, ActorRef[T]] =
// TypeMapper[String, ActorRef[T]](resolver.resolveActorRef)(serialize)

def deserialize[T](str: String) = resolver.resolveActorRef[T](str)
def serialize[T](ref: ActorRef[T]) =
Expand Down
12 changes: 10 additions & 2 deletions src/main/scala/com/algorithm/LocalMaximaColouring.scala
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,7 @@ case class Colour(num: Int) {
require(num >= 0)
}

object LocalMaximaColouring extends VertexProgram[Int, Int, Int, Set[Int], Option[Colour]] {

trait LocalMaximalColouringAbstractMode extends VertexProgram[Int, Int, Int, Set[Int], Option[Colour]] {
override def gather(edgeVal: Int, message: Int): Set[Int] = Set(message)

override def sum(a: Set[Int], b: Set[Int]): Set[Int] = a.union(b)
Expand Down Expand Up @@ -63,3 +62,12 @@ object LocalMaximaColouring extends VertexProgram[Int, Int, Int, Set[Int], Optio

override val defaultActivationStatus: Boolean = true
}

object LocalMaximaColouring extends LocalMaximalColouringAbstractMode {
override val mode = VertexProgram.Outwards
}

object LocalMaximaColouringBidirectional extends LocalMaximalColouringAbstractMode {
override val mode: VertexProgram.Mode = VertexProgram.Bidirectional

}
2 changes: 2 additions & 0 deletions src/main/scala/com/algorithm/SSSP.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ package com.algorithm

object SSSP extends VertexProgram[Int, Int, Int, Int, Int] {

override val mode = VertexProgram.Outwards

override def gather(edgeVal: Int, message: Int): Int = {
edgeVal + message
}
Expand Down
26 changes: 16 additions & 10 deletions src/main/scala/com/algorithm/SequentialRun.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2,19 +2,19 @@ package com.algorithm

import scalax.collection.Graph
import scalax.collection.edge.WDiEdge
import com.algorithm.VertexProgram.Outwards
import com.algorithm.VertexProgram.Inwards
import com.algorithm.VertexProgram.Bidirectional

object SequentialRun {
def apply[VertexIdT, MessageT, AccumulatorT, VertexValT](
vertexProgram: VertexProgram[VertexIdT, Int, MessageT, AccumulatorT, VertexValT],
graph: Graph[VertexIdT, WDiEdge]
)(
initialStates: Map[graph.NodeT, VertexValT],
initialActiveMap: Map[graph.NodeT, Boolean]
): Map[graph.NodeT, VertexValT] = {

type Vertex = graph.NodeT
// var vertices: Seq[Int] = graph.nodes.toSeq.mzap({x:graph.NodeT => x.value})
var vertices = graph.nodes
val vertices = graph.nodes
var states = vertices.map(v => (v -> vertexProgram.defaultVertexValue)).toMap
var activeMap = vertices.map(v => (v -> vertexProgram.defaultActivationStatus)).toMap

Expand All @@ -31,9 +31,16 @@ object SequentialRun {
def sendMessage(dest: Vertex, edge: graph.EdgeT, msg: MessageT): Unit = {
nextMailboxes = nextMailboxes.updated(dest, nextMailboxes(dest).updated(edge, msg))
}
def outEdges(src: Vertex): Iterable[graph.EdgeT] = {
graph.edges.filter(edge => edge._1 == src)
}

def relevantEdges(v: Vertex): Iterable[(graph.NodeT, graph.EdgeT)] = {
val outEdges = graph.edges.filter(edge => edge._1 == v).map(edge => (edge._2, edge))
val inEdges = graph.edges.filter(edge => edge._2 == v).map(edge => (edge._1, edge))
vertexProgram.mode match {
case Outwards => outEdges
case Inwards => inEdges
case Bidirectional => outEdges ++ inEdges
}
}

var progressFlag = true

Expand Down Expand Up @@ -73,11 +80,10 @@ object SequentialRun {

// Scatter
for {
edge <- outEdges(vtx)
dest = edge._2
(msgDest, edge) <- relevantEdges(vtx)
msg <- vertexProgram.scatter(vtx, oldVal, newVal)
} {
sendMessage(dest, edge, msg)
sendMessage(msgDest, edge, msg)
}

val activation = !vertexProgram.voteToHalt(oldVal, newVal)
Expand Down
11 changes: 11 additions & 0 deletions src/main/scala/com/algorithm/VertexProgram.scala
Original file line number Diff line number Diff line change
@@ -1,7 +1,17 @@
package com.algorithm

object VertexProgram {
sealed trait Mode
case object Outwards extends Mode // Send messages to out-neighbours
case object Inwards extends Mode // Send messages to in-neighbours
case object Bidirectional extends Mode // Send messages to both out-neighbours and in-neighbours
}

// Stateless
trait VertexProgram[VertexIdT, EdgeValT, MessageT, AccumulatorT, VertexValT] {

val mode: VertexProgram.Mode

def gather(edgeVal: EdgeValT, message: MessageT): AccumulatorT

def sum(a: AccumulatorT, b: AccumulatorT): AccumulatorT
Expand All @@ -18,5 +28,6 @@ trait VertexProgram[VertexIdT, EdgeValT, MessageT, AccumulatorT, VertexValT] {
def voteToHalt(oldVal: VertexValT, newVal: VertexValT): Boolean

val defaultVertexValue: VertexValT

val defaultActivationStatus: Boolean
}
43 changes: 43 additions & 0 deletions src/main/scala/com/algorithm/WCC.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
package com.algorithm

import scalax.collection.edge.Implicits._
import scalax.collection.Graph // or scalax.collection.mutable.Graph
import scalax.collection.GraphPredef._, scalax.collection.GraphEdge._
import scalax.collection.edge.WDiEdge

object WCC extends VertexProgram[Int, Int, Int, Int, Int] {

override val mode: VertexProgram.Mode = VertexProgram.Bidirectional

override def gather(edgeVal: Int, message: Int): Int = {
message
}

override def sum(a: Int, b: Int): Int = {
Math.min(a, b)
}

override def apply(superStepNumber: Int, thisVertexId: Int, oldVal: Int, total: Option[Int]): Int = {
if(superStepNumber == 0) {
thisVertexId
} else total match {
case Some(componentId) => Math.min(oldVal, componentId)
case None => oldVal
}
}

override def scatter(thisVertexId: Int, oldVal: Int, newVal: Int): Option[Int] = {
if(newVal < oldVal) {
Some(newVal)
} else {
assert(newVal == oldVal, s"Unexpected newVal=${newVal}, oldVal=${oldVal}")
None
}
}

override def voteToHalt(oldVal: Int, newVal: Int): Boolean = true

override val defaultVertexValue: Int = Integer.MAX_VALUE

override val defaultActivationStatus: Boolean = true
}
67 changes: 37 additions & 30 deletions src/main/scala/com/cluster/graph/ClusterShardingApp.scala
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ import com.cluster.graph.entity.{EntityId, VertexEntityType}
import com.typesafe.config.{Config, ConfigFactory}

import scala.collection.mutable.ArrayBuffer
import com.cluster.graph.entity.{EntityId, VertexEntityType}
import com.preprocessing.partitioning.oneDim.{Main, Mirror}

object ClusterShardingApp {

Expand Down Expand Up @@ -69,7 +71,7 @@ object ClusterShardingApp {
partCoordMap(pid) = pcPort

val entityManager = ActorSystem[EntityManager.Command](
EntityManager(partitionMap, png.mainArray, pid),
EntityManager(partitionMap, png.mainArray, pid, png.inEdgePartition),
"ClusterSystem",
shardConfig
)
Expand All @@ -83,7 +85,7 @@ object ClusterShardingApp {

val frontConfig = createConfig(frontRole, frontPort)
val entityManager = ActorSystem[EntityManager.Command](
EntityManager(partitionMap, png.mainArray, pid),
EntityManager(partitionMap, png.mainArray, pid, png.inEdgePartition),
"ClusterSystem",
frontConfig
)
Expand Down Expand Up @@ -131,7 +133,12 @@ object ClusterShardingApp {
main.id,
main.partition.id,
main.neighbors.map(n =>
new EntityId(VertexEntityType.Main.toString(), n.id, n.partition.id)
n match {
case neighbor: Main =>
new EntityId(VertexEntityType.Main.toString(), neighbor.id, neighbor.partition.id)
case neighbor: Mirror =>
new EntityId(VertexEntityType.Mirror.toString(), neighbor.id, neighbor.partition.id)
}
)
)
}
Expand Down Expand Up @@ -159,36 +166,36 @@ object ClusterShardingApp {
}
println(s"Total Mains Acknowledged: $totalMainsAckd")
assert(totalMainsAckd == nMains)

gcRef ! GlobalCoordinator.BEGIN()
// TODO at beginning send, BEGIN(0)
// increment mains and their mirrors
for (main <- png.mainArray)
entityManager ! EntityManager.AddOne(
VertexEntityType.Main.toString(),
main.id,
main.partition.id
)
for (main <- png.mainArray)
entityManager ! EntityManager.AddOne(
VertexEntityType.Main.toString(),
main.id,
main.partition.id
)
// for (main <- png.mainArray)
// entityManager ! EntityManager.AddOne(
// VertexEntityType.Main.toString(),
// main.id,
// main.partition.id
// )
// for (main <- png.mainArray)
// entityManager ! EntityManager.AddOne(
// VertexEntityType.Main.toString(),
// main.id,
// main.partition.id
// )

// see if increments have been propagated correctly to mirrors
for (main <- png.mainArray) {
entityManager ! EntityManager.GetSum(
VertexEntityType.Main.toString(),
main.id,
main.partition.id
)
for (mirror <- main.mirrors) {
entityManager ! EntityManager.GetSum(
VertexEntityType.Mirror.toString(),
mirror.id,
mirror.partition.id
)
}
}
// for (main <- png.mainArray) {
// entityManager ! EntityManager.GetSum(
// VertexEntityType.Main.toString(),
// main.id,
// main.partition.id
// )
// for (mirror <- main.mirrors) {
// entityManager ! EntityManager.GetSum(
// VertexEntityType.Mirror.toString(),
// mirror.id,
// mirror.partition.id
// )
// }
// }
}
}
23 changes: 16 additions & 7 deletions src/main/scala/com/cluster/graph/EntityManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ import akka.cluster.sharding.typed.scaladsl.{ClusterSharding, Entity, EntityRef}
import akka.cluster.typed.Cluster
import akka.util.Timeout
import com.CborSerializable
import com.cluster.graph.GlobalCoordinator.GlobalCoordinatorKey
import com.cluster.graph.Init.{blockInitMain, blockInitMirror, blockInitPartitionCoordinator}
import com.cluster.graph.entity._
import com.preprocessing.partitioning.oneDim.Main
Expand All @@ -27,7 +26,8 @@ class EntityManager(
ctx: ActorContext[EntityManager.Command],
partitionMap: collection.mutable.Map[Int, Int],
mainArray: Array[Main],
pid: Int
pid: Int,
inEdgePartition: Array[collection.mutable.Map[Int, Int]]
) extends AbstractBehavior[EntityManager.Command](ctx) {

import EntityManager._
Expand Down Expand Up @@ -154,17 +154,25 @@ class EntityManager(
sharding.entityRefFor(VertexEntity.TypeKey, eid.toString)
// initialize all mirrors of main // TODO Review main check is needed anymore
if (isMain(eid)) {
val mainERef: EntityRef[MainEntity.Initialize] =
val mainERef: EntityRef[VertexEntity.Initialize] =
sharding.entityRefFor(VertexEntity.TypeKey, eid.toString)
val mirrors = mainArray(eid.vertexId).mirrors.map(m =>
new EntityId(VertexEntityType.Mirror.toString(), m.id, m.partition.id)
)
// TODO Pass partitionInDegree to all vertices being created
val nInEdges = inEdgePartition(eid.partitionId)(eid.vertexId)
// println("eid main, ", eid.partitionId, eid.vertexId, nInEdges)
totalMainsInitialized =
blockInitMain(mainERef, eid, neighbors, mirrors, totalMainsInitialized)
blockInitMain(mainERef, eid, neighbors, mirrors, nInEdges, totalMainsInitialized)
for (m <- mirrors) {
val mirrorERef: EntityRef[VertexEntity.Command] =
sharding.entityRefFor(VertexEntity.TypeKey, m.toString)
totalMirrorsInitialized = blockInitMirror(mirrorERef, m, eid, totalMirrorsInitialized)
// TODO Need to add neighbours
val neighbors = ArrayBuffer[EntityId]()
val nInEdges = inEdgePartition(m.partitionId)(m.vertexId)
// println("eid mirror, ", m.partitionId, m.vertexId, nInEdges)

totalMirrorsInitialized = blockInitMirror(mirrorERef, m, eid, neighbors, nInEdges, totalMirrorsInitialized)
}
}
}
Expand Down Expand Up @@ -195,12 +203,13 @@ object EntityManager {
def apply(
partitionMap: collection.mutable.Map[Int, Int],
mainArray: Array[Main],
pid: Int
pid: Int,
inEdgePartition: Array[collection.mutable.Map[Int, Int]]
): Behavior[EntityManager.Command] = Behaviors.setup(ctx => {
val EntityManagerKey =
ServiceKey[EntityManager.Command](s"entityManager${pid}")
ctx.system.receptionist ! Receptionist.Register(EntityManagerKey, ctx.self)
new EntityManager(ctx, partitionMap, mainArray, pid)
new EntityManager(ctx, partitionMap, mainArray, pid, inEdgePartition)
})
// command/response typedef
sealed trait Command extends CborSerializable
Expand Down
5 changes: 5 additions & 0 deletions src/main/scala/com/cluster/graph/GlobalCoordinator.scala
Original file line number Diff line number Diff line change
Expand Up @@ -70,14 +70,19 @@ class GlobalCoordinator(ctx: ActorContext[GlobalCoordinator.Command])

case DONE(stepNum) =>
doneCounter(stepNum) += 1
println(s"gc : step ${stepNum}: done counter${doneCounter(stepNum)}")

if (globallyDone(stepNum)) {
println("globally done")
broadcastBEGINToPCs(stepNum + 1)
}
Behaviors.same

case TerminationVote(stepNum) =>

voteCounter(stepNum) += 1
if (globallyTerminated(stepNum)) {
println("TERMINATION")
//TODO TERMINATE..?
} else if (globallyDone(stepNum)) {
broadcastBEGINToPCs(stepNum + 1)
Expand Down
Loading

0 comments on commit ac838d2

Please sign in to comment.