Skip to content

Commit

Permalink
clean build.sbt;
Browse files Browse the repository at this point in the history
add symmetric rmat graph for testing colouring;
shutdown actorsystems in cluster sharding app;
clean up entitymanager;
  • Loading branch information
atrostan committed Nov 29, 2021
1 parent 50c9984 commit 804b4a9
Show file tree
Hide file tree
Showing 8 changed files with 176 additions and 156 deletions.
8 changes: 0 additions & 8 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,6 @@ ThisBuild / assemblyMergeStrategy := {
case x => MergeStrategy.first
}

//ThisBuild / assemblyMergeStrategy := {
// case PathList("META-INF", "services", "org.apache.hadoop.fs.FileSystem") =>
// MergeStrategy.filterDistinctLines
//// case PathList("META-INF", "MANIFEST.MF") => MergeStrategy.discard
// case PathList("META-INF", ps @ _*) => MergeStrategy.discard
// case x => MergeStrategy.first
//}

lazy val `akka-gps` = project
.in(file("."))
.settings(multiJvmSettings: _*)
Expand Down
2 changes: 1 addition & 1 deletion scripts/spark/preprocess.sh
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ partitionerClass="${preprocessingPackage}.partitioning.Driver"
aggregatorClass="${preprocessingPackage}.aggregation.Driver"

# directory that stores the graph
graphName="8rmat"
graphName="symmRmat"
graphDir="${akka_gps_home}/src/main/resources/graphs/${graphName}"

# original, uncompressed edgelist
Expand Down
128 changes: 128 additions & 0 deletions src/main/resources/graphs/symmRmat/orig.net
Original file line number Diff line number Diff line change
@@ -0,0 +1,128 @@
0 1
1 0
0 2
2 0
0 3
3 0
0 6
6 0
0 32
32 0
0 34
34 0
0 48
48 0
0 57
57 0
1 0
0 1
1 4
4 1
1 8
8 1
1 17
17 1
1 32
32 1
1 34
34 1
1 40
40 1
1 42
42 1
2 32
32 2
3 0
0 3
3 32
32 3
3 48
48 3
3 56
56 3
4 20
20 4
4 32
32 4
5 0
0 5
8 0
0 8
8 2
2 8
8 3
3 8
8 4
4 8
8 32
32 8
8 36
36 8
8 48
48 8
9 0
0 9
9 1
1 9
9 36
36 9
9 40
40 9
10 50
50 10
12 48
48 12
14 0
0 14
16 0
0 16
16 2
2 16
16 32
32 16
16 43
43 16
16 48
48 16
17 4
4 17
17 33
33 17
17 41
41 17
24 38
38 24
24 51
51 24
32 0
0 32
32 3
3 32
33 4
4 33
33 8
8 33
34 0
0 34
35 32
32 35
36 0
0 36
36 38
38 36
40 0
0 40
40 4
4 40
41 36
36 41
44 15
15 44
48 0
0 48
48 8
8 48
48 17
17 48
50 0
0 50
2 changes: 2 additions & 0 deletions src/main/resources/graphs/symmRmat/stats.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
Nodes: 33
Edges: 114
8 changes: 4 additions & 4 deletions src/main/resources/paths.yaml
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
workers:
- /home/atrostan/Workspace/akka-gps/src/main/resources/graphs/8rmat/partitions/hybrid/bySrc/p0
- /home/atrostan/Workspace/akka-gps/src/main/resources/graphs/8rmat/partitions/hybrid/bySrc/p1
- /home/atrostan/Workspace/akka-gps/src/main/resources/graphs/8rmat/partitions/hybrid/bySrc/p2
- /home/atrostan/Workspace/akka-gps/src/main/resources/graphs/8rmat/partitions/hybrid/bySrc/p3
- /home/atrostan/Workspace/akka-gps/src/main/resources/graphs/symmRmat/partitions/hybrid/bySrc/p0
- /home/atrostan/Workspace/akka-gps/src/main/resources/graphs/symmRmat/partitions/hybrid/bySrc/p1
- /home/atrostan/Workspace/akka-gps/src/main/resources/graphs/symmRmat/partitions/hybrid/bySrc/p2
- /home/atrostan/Workspace/akka-gps/src/main/resources/graphs/symmRmat/partitions/hybrid/bySrc/p3
70 changes: 24 additions & 46 deletions src/main/scala/com/cluster/graph/ClusterShardingApp.scala
Original file line number Diff line number Diff line change
Expand Up @@ -6,17 +6,12 @@ import com.Typedefs.{EMRef, GCRef, PCRef}
import com.cluster.graph.EntityManager.{InitializeMains, InitializeMirrors}
import com.cluster.graph.Init._
import com.cluster.graph.PartitionCoordinator.BroadcastLocation
import com.cluster.graph.entity.{EntityId, VertexEntityType}
import com.preprocessing.partitioning.Util.{readMainPartitionDF, readMirrorPartitionDF, readWorkerPathsFromYaml}
import com.preprocessing.partitioning.oneDim.{Main, Mirror}
import com.typesafe.config.{Config, ConfigFactory}
import org.apache.spark.sql.SparkSession
import org.apache.spark.{SparkConf, SparkContext}
import scala.collection.mutable.ArrayBuffer
import akka.actor.typed.scaladsl.AskPattern.Askable
import scala.collection.mutable.ArrayBuffer
import com.cluster.graph.entity.{EntityId, VertexEntityType}
import com.preprocessing.partitioning.oneDim.{Main, Mirror}
import com.algorithm.Colour
import scala.concurrent.{Await, Future}
import akka.util.Timeout
Expand Down Expand Up @@ -60,7 +55,7 @@ object ClusterShardingApp {
val sc = new SparkContext(conf)
sc.setLogLevel("ERROR")
val spark: SparkSession = SparkSession.builder.getOrCreate

val actorSystems = ArrayBuffer[ActorSystem[ClusterEvent.ClusterDomainEvent with EntityManager.Command]]()
val hadoopConfig = sc.hadoopConfiguration
hadoopConfig.set("fs.hdfs.impl", classOf[org.apache.hadoop.hdfs.DistributedFileSystem].getName)
hadoopConfig.set("fs.file.impl", classOf[org.apache.hadoop.fs.LocalFileSystem].getName)
Expand All @@ -69,8 +64,6 @@ object ClusterShardingApp {
// a map between partition ids to location on hdfs of mains, mirrors for that partition
val workerMap: Map[Int, String] = readWorkerPathsFromYaml(workerPaths: String)

val png = initGraphPartitioning(numberOfShards)

val config = ConfigFactory.load("cluster")
println(config.getConfig("ec2"))
val nodesUp = collection.mutable.Set[Member]()
Expand All @@ -86,9 +79,8 @@ object ClusterShardingApp {
"ClusterSystem",
domainListenerConfig
)

actorSystems.append(domainListener)
val shardPorts = ArrayBuffer[Int](25252, 25253, 25254, 25255)
val shardActors = ArrayBuffer[ActorSystem[EntityManager.Command]]()
var pid = 0
var nMains = 0
var nMirrors = 0
Expand All @@ -107,25 +99,26 @@ object ClusterShardingApp {
partCoordMap(pid) = pcPort

val entityManager = ActorSystem[EntityManager.Command](
EntityManager(partitionMap, png.mainArray, pid, png.inEdgePartition, mains, mirrors),
EntityManager(partitionMap, pid, mains, mirrors),
"ClusterSystem",
shardConfig
)
shardActors += entityManager

actorSystems.append(entityManager)
pid += 1
}

sc.stop()

val frontPort = shardPorts.last + 1
val frontRole = "front"

val frontConfig = createConfig(frontRole, frontPort)
val entityManager = ActorSystem[EntityManager.Command](
EntityManager(partitionMap, png.mainArray, pid, png.inEdgePartition, null, null),
// the global entity manager is not assigned any mains, mirrors
EntityManager(partitionMap, pid, null, null),
"ClusterSystem",
frontConfig
)
actorSystems.append(entityManager)

println("Blocking until all cluster members are up...")
blockUntilAllMembersUp(domainListener, nNodes)
Expand Down Expand Up @@ -163,16 +156,12 @@ object ClusterShardingApp {
println("Broadcasting the Global Coordinator address to all Partition Coordinators")
broadcastGCtoPCs(gcRef, entityManager.scheduler)

println("here are the em refs:")
emRefs.foreach(println)

for (pid <- 0 until numberOfShards) {
val emRef = emRefs(pid)
emRef ! InitializeMains
emRef ! InitializeMirrors
}


var nMainsInitialized = 0
var nMirrorsInitialized = 0

Expand All @@ -182,33 +171,13 @@ object ClusterShardingApp {
nMirrorsInitialized += getNMirrorsInitialized(entityManager, emRef)
}


println("Checking that all Mains, Mirrors have been initialized...")
println(s"Total Mains Initialized: $nMainsInitialized")
println(s"Total Mirrors Initialized: $nMirrorsInitialized")
assert(nMainsInitialized == nMains)
assert(nMirrorsInitialized == nMirrors)
return

println(s"Initializing ${nMains} Mains and ${nMirrors} Mirrors...")
for (main <- png.mainArray) {
entityManager ! EntityManager.Initialize(
VertexEntityType.Main.toString(),
main.id,
main.partition.id,
main.neighbors.map(n =>
n match {
case neighbor: Main =>
new EntityId(VertexEntityType.Main.toString(), neighbor.id, neighbor.partition.id)
case neighbor: Mirror =>
new EntityId(VertexEntityType.Mirror.toString(), neighbor.id, neighbor.partition.id)
}
)
)
}


// after initialization, each partition coordinator should broadcast its location to its mains
// after initialization, each partition coordinator should broadcast its location to its mains
for ((pid, pcRef) <- pcRefs) {
println(s"PC${pid} Broadcasting location to its main")
pcRef ! BroadcastLocation()
Expand Down Expand Up @@ -237,20 +206,29 @@ object ClusterShardingApp {
val sched = entityManager.scheduler
val future: Future[GlobalCoordinator.FinalValuesResponse] = gcRef.ask(ref => GlobalCoordinator.GetFinalValues(ref))(timeout,sched)
Await.result(future, Duration.Inf) match {
case FinalValuesResponseComplete(valueMap) => {
case FinalValuesResponseComplete(valueMap) =>
finalVals = valueMap
}
case FinalValuesResponseNotFinished => ()
}

}

println("Final Values from main app:")
for((key, value) <- finalVals){
println(s"$key -> $value")
finalVals.map{ case (k, v) =>
v match {
case Some(c: Colour) => (k, c.num)
case _ => (k, -1)
}
}.toList.sorted.foreach(println)

def shutdown(systems: ArrayBuffer[ActorSystem[ClusterEvent.ClusterDomainEvent with EntityManager.Command]]): Unit = {
for (s <- systems) {
println(s"terminating ${s}")
s.terminate()
}
}

// TODO shut down actor system
shutdown(actorSystems)
println("Bye Bye!")

// increment mains and their mirrors
// for (main <- png.mainArray)
Expand Down
Loading

0 comments on commit 804b4a9

Please sign in to comment.