Skip to content

Commit

Permalink
Merge pull request #16 from atrostan/pagerank
Browse files Browse the repository at this point in the history
Pagerank
  • Loading branch information
jporemba committed Nov 28, 2021
2 parents e84d506 + 5f558c8 commit 489fc09
Show file tree
Hide file tree
Showing 11 changed files with 174 additions and 30 deletions.
13 changes: 7 additions & 6 deletions src/main/scala/com/algorithm/LocalMaximaColouring.scala
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,14 @@ case class Colour(num: Int) {
require(num >= 0)
}

trait LocalMaximalColouringAbstractMode extends VertexProgram[Int, Int, Int, Set[Int], Option[Colour]] {
trait LocalMaximalColouringAbstractMode extends VertexProgram[Int, Int, Set[Int], Option[Colour]] {
override def gather(edgeVal: Int, message: Int): Set[Int] = Set(message)

override def sum(a: Set[Int], b: Set[Int]): Set[Int] = a.union(b)

override def apply(
superStepNumber: Int,
thisVertexId: Int,
thisVertex: VertexInfo,
oldVal: Option[Colour],
total: Option[Set[Int]]
): Option[Colour] = {
Expand All @@ -27,7 +27,7 @@ trait LocalMaximalColouringAbstractMode extends VertexProgram[Int, Int, Int, Set
Some(Colour(superStepNumber - 1))
}
case Some(idSet) => {
if (idSet.max < thisVertexId) {
if (idSet.max < thisVertex.id) {
// Colour myself with superstep number
Some(Colour(superStepNumber - 1))
} else {
Expand All @@ -41,17 +41,18 @@ trait LocalMaximalColouringAbstractMode extends VertexProgram[Int, Int, Int, Set
}

override def scatter(
thisVertexId: Int,
superStepNumber: Int,
thisVertex: VertexInfo,
oldVal: Option[Colour],
newVal: Option[Colour]
): Option[Int] = {
newVal match {
case None => Some(thisVertexId)
case None => Some(thisVertex.id)
case Some(colour) => None
}
}

override def voteToHalt(oldVal: Option[Colour], newVal: Option[Colour]): Boolean = {
override def voteToHalt(superStepNumber: Int, oldVal: Option[Colour], newVal: Option[Colour]): Boolean = {
newVal match {
case None => false
case Some(colour) => true
Expand Down
42 changes: 42 additions & 0 deletions src/main/scala/com/algorithm/PageRank.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
package com.algorithm

// Non-normalized PageRank
// Assumes no sink vertices
class PageRank(iters: Int) extends VertexProgram[Int, Double, Double, Double] {

require(iters >= 1)

override val mode: VertexProgram.Mode = VertexProgram.Outwards

override def gather(edgeVal: Int, message: Double): Double = message

override def sum(a: Double, b: Double): Double = a + b

override def apply(superStepNumber: Int, thisVertex: VertexInfo, oldVal: Double, total: Option[Double]): Double = {
if (superStepNumber == 0) {
// Do nothing on first superstep except send PR to neighbours
oldVal
} else {
// println(s"StepNum: ${superStepNumber}")
val sum = total.getOrElse(0.0) // Safe: should have received something from every neighbour
0.15 + (0.85 * sum)
}
}

override def scatter(superStepNumber: Int, thisVertex: VertexInfo, oldVal: Double, newVal: Double): Option[Double] = {
if(superStepNumber >= iters) {
None
} else {
Some(newVal / thisVertex.degree)
}
}

override def voteToHalt(superStepNumber: Int, oldVal: Double, newVal: Double): Boolean = {
superStepNumber >= iters
}

override val defaultVertexValue: Double = 1.0

override val defaultActivationStatus: Boolean = true

}
10 changes: 5 additions & 5 deletions src/main/scala/com/algorithm/SSSP.scala
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
package com.algorithm

object SSSP extends VertexProgram[Int, Int, Int, Int, Int] {
object SSSP extends VertexProgram[Int, Int, Int, Int] {

override val mode = VertexProgram.Outwards

Expand All @@ -14,11 +14,11 @@ object SSSP extends VertexProgram[Int, Int, Int, Int, Int] {

override def apply(
superStepNumber: Int,
thisVertexId: Int,
thisVertex: VertexInfo,
oldVal: Int,
total: Option[Int]
): Int = {
if (thisVertexId == 0) {
if (thisVertex.id == 0) {
0
} else {
total match {
Expand All @@ -28,7 +28,7 @@ object SSSP extends VertexProgram[Int, Int, Int, Int, Int] {
}
}

override def scatter(thisVertexId: Int, oldVal: Int, newVal: Int): Option[Int] = {
override def scatter(superStepNumber: Int, thisVertex: VertexInfo, oldVal: Int, newVal: Int): Option[Int] = {
if (newVal < oldVal) {
Some(newVal)
} else {
Expand All @@ -37,7 +37,7 @@ object SSSP extends VertexProgram[Int, Int, Int, Int, Int] {
}
}

override def voteToHalt(oldVal: Int, newVal: Int): Boolean = true
override def voteToHalt(superStepNumber: Int, oldVal: Int, newVal: Int): Boolean = true

override val defaultActivationStatus: Boolean = true

Expand Down
14 changes: 8 additions & 6 deletions src/main/scala/com/algorithm/SequentialRun.scala
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,9 @@ import com.algorithm.VertexProgram.Inwards
import com.algorithm.VertexProgram.Bidirectional

object SequentialRun {
def apply[VertexIdT, MessageT, AccumulatorT, VertexValT](
vertexProgram: VertexProgram[VertexIdT, Int, MessageT, AccumulatorT, VertexValT],
graph: Graph[VertexIdT, WDiEdge]
def apply[MessageT, AccumulatorT, VertexValT](
vertexProgram: VertexProgram[Int, MessageT, AccumulatorT, VertexValT],
graph: Graph[Int, WDiEdge]
): Map[graph.NodeT, VertexValT] = {

type Vertex = graph.NodeT
Expand Down Expand Up @@ -42,6 +42,8 @@ object SequentialRun {
}
}

val vertexInfoMap = vertices.map(v => (v, VertexInfo(v.value, relevantEdges(v).size))).toMap

var progressFlag = true

while (progressFlag) {
Expand Down Expand Up @@ -75,18 +77,18 @@ object SequentialRun {

// Apply
val oldVal = states(vtx)
val newVal = vertexProgram.apply(superstep, vtx.value, oldVal, finalAccumulator)
val newVal = vertexProgram.apply(superstep, vertexInfoMap(vtx), oldVal, finalAccumulator)
states = states.updated(vtx, newVal)

// Scatter
for {
(msgDest, edge) <- relevantEdges(vtx)
msg <- vertexProgram.scatter(vtx, oldVal, newVal)
msg <- vertexProgram.scatter(superstep, vertexInfoMap(vtx), oldVal, newVal)
} {
sendMessage(msgDest, edge, msg)
}

val activation = !vertexProgram.voteToHalt(oldVal, newVal)
val activation = !vertexProgram.voteToHalt(superstep, oldVal, newVal)
activeMap = activeMap.updated(vtx, activation)
}

Expand Down
16 changes: 12 additions & 4 deletions src/main/scala/com/algorithm/VertexProgram.scala
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,11 @@ object VertexProgram {
case object Bidirectional extends Mode // Send messages to both out-neighbours and in-neighbours
}


// Stateless
trait VertexProgram[VertexIdT, EdgeValT, MessageT, AccumulatorT, VertexValT] {
trait VertexProgram[EdgeValT, MessageT, AccumulatorT, VertexValT] {

type VertexIdT = Int

val mode: VertexProgram.Mode

Expand All @@ -18,16 +21,21 @@ trait VertexProgram[VertexIdT, EdgeValT, MessageT, AccumulatorT, VertexValT] {

def apply(
superStepNumber: Int,
thisVertexId: VertexIdT,
thisVertex: VertexInfo,
oldVal: VertexValT,
total: Option[AccumulatorT]
): VertexValT

def scatter(thisVertexId: VertexIdT, oldVal: VertexValT, newVal: VertexValT): Option[MessageT]
def scatter(superStepNumber: Int, thisVertex: VertexInfo, oldVal: VertexValT, newVal: VertexValT): Option[MessageT]

def voteToHalt(oldVal: VertexValT, newVal: VertexValT): Boolean
def voteToHalt(superStepNumber: Int, oldVal: VertexValT, newVal: VertexValT): Boolean

val defaultVertexValue: VertexValT

val defaultActivationStatus: Boolean
}

case class VertexInfo(
id: Int,
degree: Int
)
10 changes: 5 additions & 5 deletions src/main/scala/com/algorithm/WCC.scala
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import scalax.collection.Graph // or scalax.collection.mutable.Graph
import scalax.collection.GraphPredef._, scalax.collection.GraphEdge._
import scalax.collection.edge.WDiEdge

object WCC extends VertexProgram[Int, Int, Int, Int, Int] {
object WCC extends VertexProgram[Int, Int, Int, Int] {

override val mode: VertexProgram.Mode = VertexProgram.Bidirectional

Expand All @@ -17,16 +17,16 @@ object WCC extends VertexProgram[Int, Int, Int, Int, Int] {
Math.min(a, b)
}

override def apply(superStepNumber: Int, thisVertexId: Int, oldVal: Int, total: Option[Int]): Int = {
override def apply(superStepNumber: Int, thisVertex: VertexInfo, oldVal: Int, total: Option[Int]): Int = {
if(superStepNumber == 0) {
thisVertexId
thisVertex.id
} else total match {
case Some(componentId) => Math.min(oldVal, componentId)
case None => oldVal
}
}

override def scatter(thisVertexId: Int, oldVal: Int, newVal: Int): Option[Int] = {
override def scatter(superStepNumber: Int, thisVertex: VertexInfo, oldVal: Int, newVal: Int): Option[Int] = {
if(newVal < oldVal) {
Some(newVal)
} else {
Expand All @@ -35,7 +35,7 @@ object WCC extends VertexProgram[Int, Int, Int, Int, Int] {
}
}

override def voteToHalt(oldVal: Int, newVal: Int): Boolean = true
override def voteToHalt(superStepNumber: Int, oldVal: Int, newVal: Int): Boolean = true

override val defaultVertexValue: Int = Integer.MAX_VALUE

Expand Down
6 changes: 4 additions & 2 deletions src/main/scala/com/cluster/graph/entity/MainEntity.scala
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import akka.cluster.sharding.typed.scaladsl.{ClusterSharding, EntityContext, Ent

import com.cluster.graph.PartitionCoordinator
import VertexEntity._
import com.algorithm.VertexInfo

// Vertex actor
class MainEntity(
Expand Down Expand Up @@ -51,6 +52,7 @@ class MainEntity(
neighbors = neigh
mirrors = mrs
partitionInDegree = inDeg
thisVertexInfo = VertexInfo(vertexId, neighbors.size)


val logStr = s"Received ask to initialize Main ${vertexId}_${partitionId}"
Expand Down Expand Up @@ -154,7 +156,7 @@ class MainEntity(
case _ => {
// Continue

val newVal = vertexProgram.apply(stepNum, vertexId, currentValue, total)
val newVal = vertexProgram.apply(stepNum, thisVertexInfo, currentValue, total)
val oldVal = currentValue
currentValue = newVal
println(s"step: ${stepNum} cont v${this.vertexId}: color:${currentValue}")
Expand All @@ -165,7 +167,7 @@ class MainEntity(
// println(mirrorRef)
mirrorRef ! cmd
}
active = !vertexProgram.voteToHalt(oldVal, newVal)
active = !vertexProgram.voteToHalt(stepNum, oldVal, newVal)
localScatter(stepNum, oldVal, Some(newVal), sharding)
pcRef ! PartitionCoordinator.DONE(stepNum) // TODO change to new PC command
}
Expand Down
2 changes: 2 additions & 0 deletions src/main/scala/com/cluster/graph/entity/MirrorEntity.scala
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import com.CborSerializable
import scala.concurrent.duration._

import VertexEntity._
import com.algorithm.VertexInfo

class MirrorEntity(
ctx: ActorContext[VertexEntity.Command],
Expand Down Expand Up @@ -42,6 +43,7 @@ class MirrorEntity(
neighbors = neighs
main = m
partitionInDegree = inDeg
thisVertexInfo = VertexInfo(vertexId, neighbors.size)
val logStr = s"Received ask to initialize Mirror ${vertexId}_${partitionId}"
ctxLog(logStr)
replyTo ! VertexEntity.InitializeResponse(s"Initialized Mirror ${vertexId}_${partitionId}")
Expand Down
5 changes: 4 additions & 1 deletion src/main/scala/com/cluster/graph/entity/VertexEntity.scala
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import akka.cluster.sharding.typed.scaladsl.{ClusterSharding, EntityContext, Ent
import com.CborSerializable
import com.algorithm._
import com.cluster.graph.PartitionCoordinator
import com.algorithm.VertexInfo

object VertexEntity {
// Hard coded for now
Expand Down Expand Up @@ -105,6 +106,8 @@ trait VertexEntity {
val neighbourCounter: mutable.Map[SuperStep, Int] = new mutable.HashMap().withDefaultValue(0)
var value: Int

var thisVertexInfo: VertexInfo = null

def ctxLog(event: String): Unit

// Check if ready to perform role in the apply phase, then begin if ready
Expand All @@ -116,7 +119,7 @@ trait VertexEntity {
newValue: Option[VertexValT],
shardingRef: ClusterSharding
): Unit = {
val msgOption: Option[MessageT] = newValue.flatMap(vertexProgram.scatter(vertexId, oldValue, _))
val msgOption: Option[MessageT] = newValue.flatMap(vertexProgram.scatter(stepNum, thisVertexInfo, oldValue, _))

for (neighbor <- neighbors) {
// TODO 0 edgeVal for now, we need to implement these. Depends on neighbor!
Expand Down
32 changes: 32 additions & 0 deletions src/test/scala/com/algorithm/EpsilonCloseMatcher.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
package com.algorithm

import org.scalatest.matchers.MatchResult
import org.scalatest.matchers.Matcher

object EpsilonCloseMatching {

class EpsilonCloseCheck[T](epsilon: Double, expected: Map[T, Double]) extends Matcher[Map[T, Double]] {

def apply(left: Map[T, Double]): MatchResult = {

val bad = expected.filter { case (k, v) => {
left.get(k) match {
case None => false
case Some(v_prime) => Math.abs(v - v_prime) > epsilon
}
}}

MatchResult(
left.keySet == expected.keySet && bad.isEmpty,
s"Map ${left} not sufficiently close to Map ${expected}",
"Maps sufficiently close"
)
}

}

def BeEpsilonClose[T](epsilon: Double)(expected: Map[T, Double]): Matcher[Map[T, Double]] = new EpsilonCloseCheck(epsilon, expected)

}


Loading

0 comments on commit 489fc09

Please sign in to comment.