Skip to content

Commit

Permalink
Merge pull request #9 from atrostan/coord/partition-global
Browse files Browse the repository at this point in the history
Coord/partition global
  • Loading branch information
atrostan committed Nov 15, 2021
2 parents 2de43ae + 6a86f8f commit 62c505d
Show file tree
Hide file tree
Showing 34 changed files with 1,573 additions and 518 deletions.
5 changes: 5 additions & 0 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,9 @@ lazy val `akka-gps` = project
scalaVersion := "2.12.15",
Compile / scalacOptions ++= Seq("-deprecation", "-feature", "-unchecked", "-Xlog-reflective-calls", "-Xlint", "-target:jvm-1.8"),
Compile / javacOptions ++= Seq("-Xlint:unchecked", "-Xlint:deprecation"),
Compile / PB.targets := Seq(
scalapb.gen() -> (Compile / sourceManaged).value / "scalapb"
),
run / javaOptions ++= Seq("-Xms128m", "-Xmx8G", "-XX:+UseG1GC", "-Djava.library.path=./target/native", "-Dlog4j.configuration=src/main/resources/log4j.properties"),
libraryDependencies ++= Seq(
"com.typesafe.akka" %% "akka-actor-typed" % akkaVersion,
Expand All @@ -56,3 +59,5 @@ lazy val `akka-gps` = project
.configs (MultiJvm)




3 changes: 3 additions & 0 deletions project/protoc.sbt
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
addSbtPlugin("com.thesamet" % "sbt-protoc" % "1.0.2")

libraryDependencies += "com.thesamet.scalapb" %% "compilerplugin" % "0.11.3"
2 changes: 1 addition & 1 deletion src/main/resources/cluster.conf
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ akka {
"akka:https://[email protected]:25251",
# "akka:https://[email protected]:25252"
]
min-nr-of-members = 4
min-nr-of-members = 1
role {
shard.min-nr-of-members = 4
}
Expand Down
16 changes: 5 additions & 11 deletions src/main/resources/log4j.properties
Original file line number Diff line number Diff line change
@@ -1,31 +1,25 @@
log4j.logger.com.preprocessing.edgeList=WARN, LOG_FILE

log4j.rootCategory=INFO, LOG_FILE

log4j.appender.LOG_FILE = org.apache.log4j.RollingFileAppender
log4j.appender.LOG_FILE=org.apache.log4j.RollingFileAppender
log4j.appender.LOG_FILE.MaxBackupIndex=5
log4j.appender.LOG_FILE.MaxFileSize=1MB
log4j.appender.LOG_FILE.File = ./logs/spark/spark_application.log
log4j.appender.LOG_FILE.Append = false
log4j.appender.LOG_FILE.layout = org.apache.log4j.PatternLayout
log4j.appender.LOG_FILE.layout.ConversionPattern = %d{yyyy-MM-dd HH:mm:ss} %c{1} [%p] %m%n

log4j.appender.LOG_FILE.File=./logs/spark/spark_application.log
log4j.appender.LOG_FILE.Append=false
log4j.appender.LOG_FILE.layout=org.apache.log4j.PatternLayout
log4j.appender.LOG_FILE.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss} %c{1} [%p] %m%n
# Set the default spark-shell log level to WARN. When running the spark-shell, the
# log level for this class is used to overwrite the root logger's log level, so that
# the user can have different defaults for the shell and regular Spark apps.
log4j.logger.org.apache.spark.repl.Main=WARN

# Settings to quiet third party logs that are too verbose
log4j.logger.org.sparkproject.jetty=WARN
log4j.logger.org.sparkproject.jetty.util.component.AbstractLifeCycle=ERROR
log4j.logger.org.apache.spark.repl.SparkIMain$exprTyper=INFO
log4j.logger.org.apache.spark.repl.SparkILoop$SparkILoopInterpreter=INFO

# SPARK-9183: Settings to avoid annoying messages when looking up nonexistent UDFs
# in SparkSQL with Hive support
log4j.logger.org.apache.hadoop.hive.metastore.RetryingHMSHandler=FATAL
log4j.logger.org.apache.hadoop.hive.ql.exec.FunctionRegistry=ERROR

# Parquet related logging
log4j.logger.org.apache.parquet.CorruptStatistics=ERROR
log4j.logger.parquet.CorruptStatistics=ERROR
28 changes: 14 additions & 14 deletions src/main/resources/logback.xml
Original file line number Diff line number Diff line change
@@ -1,21 +1,21 @@
<configuration>
<!-- This is a development logging configuration that logs to standard out, for an example of a production
logging config, see the Akka docs: https://doc.akka.io/docs/akka/2.6/typed/logging.html#logback -->
<!-- <appender name="STDOUT" target="System.out" class="ch.qos.logback.core.ConsoleAppender">-->
<!-- <encoder>-->
<!-- <pattern>[%date{ISO8601}] [%level] [%logger] [%thread] [%X{akkaSource}] - %msg%n</pattern>-->
<!-- </encoder>-->
<!-- </appender>-->
<!-- <appender name="STDOUT" target="System.out" class="ch.qos.logback.core.ConsoleAppender">-->
<!-- <encoder>-->
<!-- <pattern>[%date{ISO8601}] [%level] [%logger] [%thread] [%X{akkaSource}] - %msg%n</pattern>-->
<!-- </encoder>-->
<!-- </appender>-->

<!-- <appender name="ASYNC" class="ch.qos.logback.classic.AsyncAppender">-->
<!-- <queueSize>1024</queueSize>-->
<!-- <neverBlock>true</neverBlock>-->
<!-- <appender-ref ref="STDOUT" />-->
<!-- </appender>-->
<!-- <appender name="ASYNC" class="ch.qos.logback.classic.AsyncAppender">-->
<!-- <queueSize>1024</queueSize>-->
<!-- <neverBlock>true</neverBlock>-->
<!-- <appender-ref ref="STDOUT" />-->
<!-- </appender>-->

<!-- <root level="INFO">-->
<!-- <appender-ref ref="ASYNC"/>-->
<!-- </root>-->
<!-- <root level="INFO">-->
<!-- <appender-ref ref="ASYNC"/>-->
<!-- </root>-->

<appender name="FILE" class="ch.qos.logback.core.FileAppender">
<file>${application.home:-.}/logs/application.log</file>
Expand All @@ -27,7 +27,7 @@
<appender name="ASYNC" class="ch.qos.logback.classic.AsyncAppender">
<queueSize>1024</queueSize>
<neverBlock>true</neverBlock>
<appender-ref ref="FILE" />
<appender-ref ref="FILE"/>
</appender>

<root level="INFO">
Expand Down
9 changes: 4 additions & 5 deletions src/main/scala/com/CborSerializable.scala
Original file line number Diff line number Diff line change
@@ -1,8 +1,7 @@
package com

/**
* Marker trait to tell Akka to serialize messages into CBOR using Jackson for sending over the network
* See tmpapplication.conf where it is bound to a serializer.
* For more details see the docs https://doc.akka.io/docs/akka/2.6/serialization-jackson.html
*/
/** Marker trait to tell Akka to serialize messages into CBOR using Jackson for sending over the
* network See tmpapplication.conf where it is bound to a serializer. For more details see the docs
* https://doc.akka.io/docs/akka/2.6/serialization-jackson.html
*/
trait CborSerializable
28 changes: 28 additions & 0 deletions src/main/scala/com/Conversion.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
package com
import akka.actor.typed
import akka.actor.typed.ActorRefResolver
import akka.actor.typed.scaladsl.adapter._
import akka.serialization.Serialization
import scalapb.TypeMapper

import java.nio.charset.StandardCharsets.UTF_8

// argh
// https://github.com/akka/akka/issues/27975
// https://discuss.lightbend.com/t/akka-typed-serialization/4336/10
// https://stackoverflow.com/questions/57427953/in-akka-typed-how-to-deserialize-a-serialized-actorref-without-its-actorsystem
object Conversion {
type ActorRef[-T] = typed.ActorRef[T] // importable via Conversion._

lazy val resolver: ActorRefResolver = ActorRefResolver {
Serialization.getCurrentTransportInformation().system.toTyped
}

implicit def mapper[T]: TypeMapper[String, ActorRef[T]] =
TypeMapper[String, ActorRef[T]](resolver.resolveActorRef)(serialize)

def deserialize[T](str: String) = resolver.resolveActorRef[T](str)
def serialize[T](ref: ActorRef[T]) =
new String(resolver.toSerializationFormat(ref).getBytes(UTF_8), UTF_8)

}
33 changes: 33 additions & 0 deletions src/main/scala/com/Typedefs.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
package com

import akka.actor.typed.ActorRef
import akka.cluster.Member
import com.cluster.graph.{EntityManager, GlobalCoordinator, PartitionCoordinator}
import com.preprocessing.partitioning.oneDim.Mirror

/** Store all global type aliases here (typedefs)
*/
object Typedefs {

// the address of a PartitionCoordinator
type PCRef = ActorRef[PartitionCoordinator.Command]

// the address of the GlobalCoordinator
type GCRef = ActorRef[GlobalCoordinator.Command]
type EMRef = ActorRef[EntityManager.Command]
// empty at vertex id i, if no vertex mirror exists in partition
// otherwise, contains reference to vertex mirror
// TODO; optimization; a bitSet to indicate existence of mirror in partition
type MirrorMap = collection.mutable.Map[Int, Mirror]
type MemberSet = collection.mutable.Set[Member]

object MirrorMap {
def empty: MirrorMap = collection.mutable.Map.empty
def apply(ms: (Int, Mirror)*): MirrorMap = collection.mutable.Map(ms: _*)
}

object MemberSet {
def empty: MemberSet = collection.mutable.Set.empty
def apply(ms: (Member)*): MemberSet = collection.mutable.Set(ms: _*)
}
}
16 changes: 16 additions & 0 deletions src/main/scala/com/Util.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
package com

import akka.actor.typed.{ActorRef, ActorRefResolver}

import java.nio.charset.StandardCharsets

object Util {
def serializeActorRef(resolver: ActorRefResolver, ref: ActorRef[_]): String = {
val serializedActorRef: Array[Byte] =
resolver.toSerializationFormat(ref).getBytes(StandardCharsets.UTF_8)
new String(serializedActorRef, StandardCharsets.UTF_8)
}
// def deserializeActorRef(resolver: ActorRefResolver, str: String, T: type): ActorRef[Any] = {
// resolver.resolveActorRef[SomeType.type](str)
// }
}
21 changes: 15 additions & 6 deletions src/main/scala/com/algorithm/LocalMaximaColouring.scala
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package com.algorithm

case class Colour(num: Int) {
require(num >= 0)
require(num >= 0)
}

object LocalMaximaColouring extends VertexProgram[Int, Int, Int, Set[Int], Option[Colour]] {
Expand All @@ -10,8 +10,13 @@ object LocalMaximaColouring extends VertexProgram[Int, Int, Int, Set[Int], Optio

override def sum(a: Set[Int], b: Set[Int]): Set[Int] = a.union(b)

override def apply(superStepNumber: Int, thisVertexId: Int, oldVal: Option[Colour], total: Option[Set[Int]]): Option[Colour] = {
if(superStepNumber == 0) {
override def apply(
superStepNumber: Int,
thisVertexId: Int,
oldVal: Option[Colour],
total: Option[Set[Int]]
): Option[Colour] = {
if (superStepNumber == 0) {
None
} else {
oldVal match {
Expand All @@ -36,16 +41,20 @@ object LocalMaximaColouring extends VertexProgram[Int, Int, Int, Set[Int], Optio
}
}

override def scatter(thisVertexId: Int, oldVal: Option[Colour], newVal: Option[Colour]): Option[Int] = {
override def scatter(
thisVertexId: Int,
oldVal: Option[Colour],
newVal: Option[Colour]
): Option[Int] = {
newVal match {
case None => Some(thisVertexId)
case None => Some(thisVertexId)
case Some(colour) => None
}
}

override def voteToHalt(oldVal: Option[Colour], newVal: Option[Colour]): Boolean = {
newVal match {
case None => false
case None => false
case Some(colour) => true
}
}
Expand Down
13 changes: 9 additions & 4 deletions src/main/scala/com/algorithm/SSSP.scala
Original file line number Diff line number Diff line change
Expand Up @@ -10,19 +10,24 @@ object SSSP extends VertexProgram[Int, Int, Int, Int, Int] {
Math.min(a, b)
}

override def apply(superStepNumber: Int, thisVertexId: Int, oldVal: Int, total: Option[Int]): Int = {
if(thisVertexId == 0) {
override def apply(
superStepNumber: Int,
thisVertexId: Int,
oldVal: Int,
total: Option[Int]
): Int = {
if (thisVertexId == 0) {
0
} else {
total match {
case Some(value) => Math.min(oldVal, value)
case None => oldVal
case None => oldVal
}
}
}

override def scatter(thisVertexId: Int, oldVal: Int, newVal: Int): Option[Int] = {
if(newVal < oldVal) {
if (newVal < oldVal) {
Some(newVal)
} else {
assert(newVal == oldVal, s"Unexpected newVal=${newVal}, oldVal=${oldVal}")
Expand Down
20 changes: 12 additions & 8 deletions src/main/scala/com/algorithm/SequentialRun.scala
Original file line number Diff line number Diff line change
@@ -1,12 +1,16 @@
package com.algorithm

import scalax.collection.edge.Implicits._
import scalax.collection.Graph // or scalax.collection.mutable.Graph
import scalax.collection.GraphPredef._, scalax.collection.GraphEdge._
import scalax.collection.Graph
import scalax.collection.edge.WDiEdge

object SequentialRun {
def apply[VertexIdT, MessageT, AccumulatorT, VertexValT](vertexProgram: VertexProgram[VertexIdT, Int, MessageT, AccumulatorT, VertexValT], graph: Graph[VertexIdT, WDiEdge]): Map[graph.NodeT, VertexValT] = {
def apply[VertexIdT, MessageT, AccumulatorT, VertexValT](
vertexProgram: VertexProgram[VertexIdT, Int, MessageT, AccumulatorT, VertexValT],
graph: Graph[VertexIdT, WDiEdge]
)(
initialStates: Map[graph.NodeT, VertexValT],
initialActiveMap: Map[graph.NodeT, Boolean]
): Map[graph.NodeT, VertexValT] = {

type Vertex = graph.NodeT
// var vertices: Seq[Int] = graph.nodes.toSeq.mzap({x:graph.NodeT => x.value})
Expand All @@ -29,18 +33,18 @@ object SequentialRun {
}
def outEdges(src: Vertex): Iterable[graph.EdgeT] = {
graph.edges.filter(edge => edge._1 == src)
}
}

var progressFlag = true

while(progressFlag) {
while (progressFlag) {
// Superstep
superstep += 1
// println("Superstep: " + superstep)
// println("States : " + states)
// println("Messages : " + currentMailboxes)
progressFlag = false

// Iterate over vertices
for {
vtx <- vertices
Expand All @@ -56,7 +60,7 @@ object SequentialRun {
case (accOption, (edge, msg)) => {
val gatheredMsg = vertexProgram.gather(edge.weight.toInt, msg)
accOption match {
case None => Some(gatheredMsg)
case None => Some(gatheredMsg)
case Some(accSoFar) => Some(vertexProgram.sum(accSoFar, gatheredMsg))
}
}
Expand Down
11 changes: 6 additions & 5 deletions src/main/scala/com/algorithm/VertexProgram.scala
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,12 @@ trait VertexProgram[VertexIdT, EdgeValT, MessageT, AccumulatorT, VertexValT] {

def sum(a: AccumulatorT, b: AccumulatorT): AccumulatorT

def apply(superStepNumber: Int, thisVertexId: VertexIdT, oldVal: VertexValT, total: Option[AccumulatorT]): VertexValT
def apply(
superStepNumber: Int,
thisVertexId: VertexIdT,
oldVal: VertexValT,
total: Option[AccumulatorT]
): VertexValT

def scatter(thisVertexId: VertexIdT, oldVal: VertexValT, newVal: VertexValT): Option[MessageT]

Expand All @@ -15,7 +20,3 @@ trait VertexProgram[VertexIdT, EdgeValT, MessageT, AccumulatorT, VertexValT] {
val defaultVertexValue: VertexValT
val defaultActivationStatus: Boolean
}




Loading

0 comments on commit 62c505d

Please sign in to comment.