Skip to content

Commit

Permalink
Merge branch 'main' of github.com:atrostan/akka-gps into preproc/hybr…
Browse files Browse the repository at this point in the history
…id-partitioning
  • Loading branch information
atrostan committed Nov 23, 2021
2 parents bfb95f7 + 9a01498 commit 6b4f845
Show file tree
Hide file tree
Showing 2 changed files with 13 additions and 5 deletions.
12 changes: 10 additions & 2 deletions src/main/scala/com/cluster/graph/entity/MainEntity.scala
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,14 @@ class MainEntity(
// Vote to terminate
println(s"step: ${stepNum} term v${this.vertexId}: color:${currentValue}")
pcRef ! PartitionCoordinator.TerminationVote(stepNum) // TODO change to new PC command

// Still have to send null messages to neighbours
localScatter(stepNum, currentValue, None, sharding)
val cmd = ApplyResult(stepNum, currentValue, None)
for (mirror <- mirrors) {
val mirrorRef = sharding.entityRefFor(VertexEntity.TypeKey, mirror.toString())
mirrorRef ! cmd
}
}
case _ => {
// Continue
Expand All @@ -145,15 +153,15 @@ class MainEntity(
val oldVal = currentValue
currentValue = newVal
println(s"step: ${stepNum} cont v${this.vertexId}: color:${currentValue}")
val cmd = ApplyResult(stepNum, oldVal, newVal)
val cmd = ApplyResult(stepNum, oldVal, Some(newVal))
for (mirror <- mirrors) {
// println(mirror)
val mirrorRef = sharding.entityRefFor(VertexEntity.TypeKey, mirror.toString())
// println(mirrorRef)
mirrorRef ! cmd
}
active = !vertexProgram.voteToHalt(oldVal, newVal)
localScatter(stepNum, oldVal, newVal, sharding)
localScatter(stepNum, oldVal, Some(newVal), sharding)
pcRef ! PartitionCoordinator.DONE(stepNum) // TODO change to new PC command
}
}
Expand Down
6 changes: 3 additions & 3 deletions src/main/scala/com/cluster/graph/entity/VertexEntity.scala
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ object VertexEntity {

// GAS
final case class MirrorTotal(stepNum: Int, total: Option[AccumulatorT]) extends Command
final case class ApplyResult(stepNum: Int, oldVal: VertexValT, newVal: VertexValT) extends Command
final case class ApplyResult(stepNum: Int, oldVal: VertexValT, newVal: Option[VertexValT]) extends Command

// Counter actions TESTING ONLY
case object Increment extends Command
Expand Down Expand Up @@ -112,10 +112,10 @@ trait VertexEntity {
def localScatter(
stepNum: SuperStep,
oldValue: VertexValT,
newValue: VertexValT,
newValue: Option[VertexValT],
shardingRef: ClusterSharding
): Unit = {
val msgOption = vertexProgram.scatter(vertexId, oldValue, newValue)
val msgOption: Option[MessageT] = newValue.flatMap(vertexProgram.scatter(vertexId, oldValue, _))

for (neighbor <- neighbors) {
// TODO 0 edgeVal for now, we need to implement these. Depends on neighbor!
Expand Down

0 comments on commit 6b4f845

Please sign in to comment.