Skip to content

Commit

Permalink
test custom configs for sharding app;
Browse files Browse the repository at this point in the history
add tagged neighbours for mains, mirrors rdds;
modify entity managers to initialize using partition specific mains, mirrors;
modify VertexEntity neighbors to include edge weights;
tag edges' src, dests with type:
main -> main, main -> mrr, mrr -> main, or mrr -> mrr;
reformat, autoindent partition.util;
  • Loading branch information
atrostan committed Nov 29, 2021
1 parent 669f608 commit 074caea
Show file tree
Hide file tree
Showing 11 changed files with 538 additions and 216 deletions.
8 changes: 7 additions & 1 deletion src/main/resources/cluster.conf
Original file line number Diff line number Diff line change
Expand Up @@ -40,4 +40,10 @@ akka {
}
downing-provider-class = "akka.cluster.sbr.SplitBrainResolverProvider"
}
}
}

ec2 {
num-partitions = 4
partitions-paths = [
]
}
8 changes: 4 additions & 4 deletions src/main/resources/paths.yaml
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
workers:
- src/main/resources/graphs/8rmat/partitions/hybrid/bySrc/p0
- src/main/resources/graphs/8rmat/partitions/hybrid/bySrc/p1
- src/main/resources/graphs/8rmat/partitions/hybrid/bySrc/p2
- src/main/resources/graphs/8rmat/partitions/hybrid/bySrc/p3
- /home/atrostan/Workspace/akka-gps/src/main/resources/graphs/8rmat/partitions/hybrid/bySrc/p0
- /home/atrostan/Workspace/akka-gps/src/main/resources/graphs/8rmat/partitions/hybrid/bySrc/p1
- /home/atrostan/Workspace/akka-gps/src/main/resources/graphs/8rmat/partitions/hybrid/bySrc/p2
- /home/atrostan/Workspace/akka-gps/src/main/resources/graphs/8rmat/partitions/hybrid/bySrc/p3
14 changes: 14 additions & 0 deletions src/main/scala/com/Typedefs.scala
Original file line number Diff line number Diff line change
Expand Up @@ -53,10 +53,24 @@ object Typedefs {
Int // indegree in partition pid
)

type TaggedMainRow = (
(Int, Int), // (vid, pid)
Set[_ <: Int], // partitions that contain mirrors
List[(Int, Int, Int)], // list of tagged outgoing edges on partition pid
Int // indegree in partition pid
)

type MirrorRow = (
(Int, Int), // (vid, pid)
Int, // partition that contains main
List[(Int, Int)], // list of outgoing edges on partition pid
Int // indegree in partition pid
)

type TaggedMirrorRow = (
(Int, Int), // (vid, pid)
Int, // partition that contains main
List[(Int, Int, Int)], // list of tagged outgoing edges on partition pid
Int // indegree in partition pid
)
}
85 changes: 63 additions & 22 deletions src/main/scala/com/cluster/graph/ClusterShardingApp.scala
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,17 @@ package com.cluster.graph
import akka.actor.typed._
import akka.cluster.{ClusterEvent, Member}
import com.Typedefs.{EMRef, GCRef, PCRef}
import com.cluster.graph.EntityManager.{InitializeMains, InitializeMirrors}
import com.cluster.graph.Init._
import com.cluster.graph.PartitionCoordinator.BroadcastLocation
import com.cluster.graph.entity.{EntityId, VertexEntityType}
import com.preprocessing.partitioning.Util.{readMainPartitionDF, readMirrorPartitionDF, readWorkerPathsFromYaml}
import com.preprocessing.partitioning.oneDim.{Main, Mirror}
import com.typesafe.config.{Config, ConfigFactory}
import org.apache.spark.sql.SparkSession
import org.apache.spark.{SparkConf, SparkContext}

import scala.collection.mutable.ArrayBuffer
import com.cluster.graph.entity.{EntityId, VertexEntityType}
import com.preprocessing.partitioning.oneDim.{Main, Mirror}

object ClusterShardingApp {

Expand Down Expand Up @@ -41,16 +44,28 @@ object ClusterShardingApp {

def main(args: Array[String]): Unit = {

val appName: String = "akka.clusterShardingApp"
val conf = new SparkConf()
.setAppName(appName)
.setMaster("local[*]")
val sc = new SparkContext(conf)
sc.setLogLevel("ERROR")
val spark: SparkSession = SparkSession.builder.getOrCreate

val hadoopConfig = sc.hadoopConfiguration
hadoopConfig.set("fs.hdfs.impl", classOf[org.apache.hadoop.hdfs.DistributedFileSystem].getName)
hadoopConfig.set("fs.file.impl", classOf[org.apache.hadoop.fs.LocalFileSystem].getName)

val workerPaths = "src/main/resources/paths.yaml"
// a map between partition ids to location on hdfs of mains, mirrors for that partition
val workerMap: Map[Int, String] = readWorkerPathsFromYaml(workerPaths: String)

val png = initGraphPartitioning(numberOfShards)

val config = ConfigFactory.load("cluster")
println(config.getConfig("ec2"))
val nodesUp = collection.mutable.Set[Member]()
for (m <- png.mainArray) {
var s = ""
s += s"${m.toString()}\n"
s += s"neighbours:\t${m.neighbors.toString()}\n"
s += s"mirrors:\t${m.mirrors.toString()}\n"
println(s)
}
return

println(s"Initializing cluster with ${nNodes} compute nodes")

println("Initializing domain listener")
Expand All @@ -66,10 +81,15 @@ object ClusterShardingApp {
val shardPorts = ArrayBuffer[Int](25252, 25253, 25254, 25255)
val shardActors = ArrayBuffer[ActorSystem[EntityManager.Command]]()
var pid = 0
val nMains = png.mainArray.length
val nMirrors = png.mainArray.map(m => m.mirrors.length).sum

var nMains = 0
var nMirrors = 0
for (shardPort <- shardPorts) {
val path = workerMap(pid)
val mains = readMainPartitionDF(path + "/mains", spark).collect()
nMains += mains.length
val mirrors = readMirrorPartitionDF(path + "/mirrors", spark).collect()
nMirrors += mirrors.length

val pcPort = shardPort + numberOfShards
val shardConfig = createConfig("shard", shardPort)
val pcConfig = createConfig("partitionCoordinator", pcPort)
Expand All @@ -78,7 +98,7 @@ object ClusterShardingApp {
partCoordMap(pid) = pcPort

val entityManager = ActorSystem[EntityManager.Command](
EntityManager(partitionMap, png.mainArray, pid, png.inEdgePartition),
EntityManager(partitionMap, png.mainArray, pid, png.inEdgePartition, mains, mirrors),
"ClusterSystem",
shardConfig
)
Expand All @@ -87,12 +107,13 @@ object ClusterShardingApp {
pid += 1
}


val frontPort = shardPorts.last + 1
val frontRole = "front"

val frontConfig = createConfig(frontRole, frontPort)
val entityManager = ActorSystem[EntityManager.Command](
EntityManager(partitionMap, png.mainArray, pid, png.inEdgePartition),
EntityManager(partitionMap, png.mainArray, pid, png.inEdgePartition, null, null),
"ClusterSystem",
frontConfig
)
Expand Down Expand Up @@ -133,6 +154,33 @@ object ClusterShardingApp {
println("Broadcasting the Global Coordinator address to all Partition Coordinators")
broadcastGCtoPCs(gcRef, entityManager.scheduler)

println("here are the em refs:")
emRefs.foreach(println)

for (pid <- 0 until numberOfShards) {
val emRef = emRefs(pid)
emRef ! InitializeMains
emRef ! InitializeMirrors
}


var nMainsInitialized = 0
var nMirrorsInitialized = 0

for (pid <- 0 until numberOfShards) {
val emRef: EMRef = emRefs(pid)
nMainsInitialized += getNMainsInitialized(entityManager, emRef)
nMirrorsInitialized += getNMirrorsInitialized(entityManager, emRef)
}


println("Checking that all Mains, Mirrors have been initialized...")
println(s"Total Mains Initialized: $nMainsInitialized")
println(s"Total Mirrors Initialized: $nMirrorsInitialized")
assert(nMainsInitialized == nMains)
assert(nMirrorsInitialized == nMirrors)
return

println(s"Initializing ${nMains} Mains and ${nMirrors} Mirrors...")
for (main <- png.mainArray) {
entityManager ! EntityManager.Initialize(
Expand All @@ -150,14 +198,7 @@ object ClusterShardingApp {
)
}

val nMainsInitialized = getNMainsInitialized(entityManager)
val nMirrorsInitialized = getNMirrorsInitialized(entityManager)

println("Checking that all Mains, Mirrors have been initialized...")
println(s"Total Mains Initialized: $nMainsInitialized")
println(s"Total Mirrors Initialized: $nMirrorsInitialized")
assert(nMainsInitialized == nMains)
assert(nMirrorsInitialized == nMirrors)
// after initialization, each partition coordinator should broadcast its location to its mains
for ((pid, pcRef) <- pcRefs) {
println(s"PC${pid} Broadcasting location to its main")
Expand Down
88 changes: 75 additions & 13 deletions src/main/scala/com/cluster/graph/EntityManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,9 @@ class EntityManager(
partitionMap: collection.mutable.Map[Int, Int],
mainArray: Array[Main],
pid: Int,
inEdgePartition: Array[collection.mutable.Map[Int, Int]]
partitionInDegree: Array[collection.mutable.Map[Int, Int]],
tmpmains: Array[(Int, Set[Int], List[(Int, Int, Int)], Int)],
tmpmirrors: Array[(Int, Int, List[(Int, Int, Int)], Int)]
) extends AbstractBehavior[EntityManager.Command](ctx) {

import EntityManager._
Expand Down Expand Up @@ -65,6 +67,50 @@ class EntityManager(

override def onMessage(msg: EntityManager.Command): Behavior[EntityManager.Command] = {
msg match {
case InitializeMains =>
println(s"Got init mains on EM${pid}")
tmpmains.map {
case (vid, mirrorPids, neighs, partitionInDegree) =>
val eid = new EntityId(VertexEntityType.Main.toString(), vid, pid)
val mainERef: EntityRef[VertexEntity.Initialize] =
sharding.entityRefFor(VertexEntity.TypeKey, eid.toString)
val neighbors = neighs.map {
case (dest, wt, tag) =>
if (tag == 0) { // main -> main
(new EntityId(VertexEntityType.Main.toString(), dest, pid), wt)
} else { // main -> mirror
(new EntityId(VertexEntityType.Mirror.toString(), dest, pid), wt)
}
}
val mirrors = mirrorPids.map(pid =>
new EntityId(VertexEntityType.Mirror.toString(), vid, pid)
).toList
totalMainsInitialized = blockInitMain(mainERef, eid, neighbors, mirrors, partitionInDegree, totalMainsInitialized)
}
println(s"${pid} total mains initialized: ", totalMainsInitialized)
Behaviors.same

case InitializeMirrors =>
println(s"Got init mirrors on EM${pid}")
tmpmirrors.map {
case (vid, mainPid, neighs, partitionInDegree) =>
val mid = new EntityId(VertexEntityType.Main.toString(), vid, mainPid)
val eid = new EntityId(VertexEntityType.Mirror.toString(), vid, pid)
val mirrorERef: EntityRef[VertexEntity.Command] =
sharding.entityRefFor(VertexEntity.TypeKey, eid.toString)
val neighbors = neighs.map {
case (dest, wt, tag) =>
if (tag == 2) { // mirror -> main
(new EntityId(VertexEntityType.Main.toString(), dest, pid), wt)
} else { // mirror -> mirror
(new EntityId(VertexEntityType.Mirror.toString(), dest, pid), wt)
}
}
totalMirrorsInitialized = blockInitMirror(mirrorERef, eid, mid, neighbors, partitionInDegree, totalMirrorsInitialized)
}
println(s"${pid} total mirrors initialized: ", totalMirrorsInitialized)
Behaviors.same

case Initialize(eCl, vid, pid, neighbors) =>
val eid = new EntityId(eCl, vid, pid)
initMainAndMirrors(eid, neighbors)
Expand Down Expand Up @@ -123,10 +169,12 @@ class EntityManager(
}

def spawnPartitionCoordinator(pid: Int): ActorRef[PartitionCoordinator.Command] = {
val mains = mainArray
.filter(m => m.partition.id == pid)
.map(m => new EntityId("Main", m.id, pid))
.toList
// val mains = mainArray

// .filter(m => m.partition.id == pid)
// .map(m => new EntityId("Main", m.id, pid))
// .toList
val mains = tmpmains.map(m => new EntityId("Main", m._1, pid)).toList
val pcChild = ctx.spawn(
Behaviors.supervise(PartitionCoordinator(mains, pid)).onFailure(SupervisorStrategy.restart),
name = s"pc$pid"
Expand Down Expand Up @@ -160,19 +208,20 @@ class EntityManager(
new EntityId(VertexEntityType.Mirror.toString(), m.id, m.partition.id)
)
// TODO Pass partitionInDegree to all vertices being created
val nInEdges = inEdgePartition(eid.partitionId)(eid.vertexId)
val nInEdges = partitionInDegree(eid.partitionId)(eid.vertexId)
// println("eid main, ", eid.partitionId, eid.vertexId, nInEdges)
totalMainsInitialized =
blockInitMain(mainERef, eid, neighbors, mirrors, nInEdges, totalMainsInitialized)
// totalMainsInitialized =
// blockInitMain(mainERef, eid, neighbors, mirrors, nInEdges, totalMainsInitialized)
for (m <- mirrors) {
val mirrorERef: EntityRef[VertexEntity.Command] =
sharding.entityRefFor(VertexEntity.TypeKey, m.toString)
// TODO Need to add neighbours
// TODO Need to add neighbours
val neighbors = ArrayBuffer[EntityId]()
val nInEdges = inEdgePartition(m.partitionId)(m.vertexId)
val nInEdges = partitionInDegree(m.partitionId)(m.vertexId)
// println("eid mirror, ", m.partitionId, m.vertexId, nInEdges)

totalMirrorsInitialized = blockInitMirror(mirrorERef, m, eid, neighbors, nInEdges, totalMirrorsInitialized)
// totalMirrorsInitialized =
// blockInitMirror(mirrorERef, m, eid, neighbors, nInEdges, totalMirrorsInitialized)
}
}
}
Expand Down Expand Up @@ -204,18 +253,31 @@ object EntityManager {
partitionMap: collection.mutable.Map[Int, Int],
mainArray: Array[Main],
pid: Int,
inEdgePartition: Array[collection.mutable.Map[Int, Int]]
partitionInDegree: Array[collection.mutable.Map[Int, Int]],
tmpmains: Array[(Int, Set[Int], List[(Int, Int, Int)], Int)],
tmpmirrors: Array[(Int, Int, List[(Int, Int, Int)], Int)]
): Behavior[EntityManager.Command] = Behaviors.setup(ctx => {
val EntityManagerKey =
ServiceKey[EntityManager.Command](s"entityManager${pid}")
ctx.system.receptionist ! Receptionist.Register(EntityManagerKey, ctx.self)
new EntityManager(ctx, partitionMap, mainArray, pid, inEdgePartition)
new EntityManager(
ctx,
partitionMap,
mainArray,
pid,
partitionInDegree,
tmpmains,
tmpmirrors
)
})
// command/response typedef
sealed trait Command extends CborSerializable
sealed trait Response extends CborSerializable

// Sync Main/Mirror Initialization
final case object InitializeMains extends Command
final case object InitializeMirrors extends Command

final case class Initialize(
entityClass: String,
vertexId: Int,
Expand Down
Loading

0 comments on commit 074caea

Please sign in to comment.