-
Notifications
You must be signed in to change notification settings - Fork 0
/
MirrorEntity.scala
119 lines (99 loc) · 3.57 KB
/
MirrorEntity.scala
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
package com.cluster.graph.entity
import akka.actor.typed.scaladsl.{AbstractBehavior, ActorContext, Behaviors}
import akka.actor.typed.{ActorRef, Behavior}
import akka.cluster.sharding.typed.scaladsl.{ClusterSharding, EntityContext, EntityTypeKey}
import com.CborSerializable
import scala.concurrent.duration._
import VertexEntity._
import com.algorithm.VertexInfo
class MirrorEntity(
ctx: ActorContext[VertexEntity.Command],
nodeAddress: String,
entityContext: EntityContext[VertexEntity.Command]
) extends AbstractBehavior[VertexEntity.Command](ctx)
with VertexEntity {
private var main: EntityId = null
var value = 0 // Counter TEST ONLY
// In order for vertices find refs for messages, they need to sharding.entityRefFor by entity id
val sharding = ClusterSharding(ctx.system)
override def ctxLog(event: String): Unit = {
ctx.log.info(
s"******************{} ${event} at {}, eid: {}",
ctx.self.path,
nodeAddress,
entityContext.entityId
)
}
override def onMessage(
msg: VertexEntity.Command
): Behavior[VertexEntity.Command] = {
msg match {
case VertexEntity.InitializeMirror(vid, pid, m, neighs, inDeg, replyTo) =>
vertexId = vid
partitionId = pid.toShort
neighbors = neighs
main = m
partitionInDegree = inDeg
thisVertexInfo = VertexInfo(vertexId, neighbors.size)
val logStr = s"Received ask to initialize Mirror ${vertexId}_${partitionId}"
ctxLog(logStr)
replyTo ! VertexEntity.InitializeResponse(s"Initialized Mirror ${vertexId}_${partitionId}")
Behaviors.same
// GAS Actions
case VertexEntity.Begin(stepNum) =>
ctxLog("Beginning compute")
value += 1
Behaviors.same
case VertexEntity.End =>
ctxLog("Ordered to stop " + msg)
// TODO Needed?
Behaviors.same
case c: VertexEntity.NeighbourMessage => reactToNeighbourMessage(c)
case ApplyResult(stepNum, oldVal, newVal) => {
ctxLog("Received apply value from Main " + newVal)
localScatter(stepNum, oldVal, newVal, sharding)
Behaviors.same
}
case VertexEntity.Idle =>
entityContext.shard ! ClusterSharding.Passivate(ctx.self)
Behaviors.same
case VertexEntity.StopVertex =>
Behaviors.stopped(() => ctxLog("stopping ... passivated for idling"))
// Counter actions TESTING ONLY
case VertexEntity.Increment =>
ctxLog("adding")
value += 1
Behaviors.same
case VertexEntity.GetValue(replyTo) =>
ctxLog("get value")
replyTo ! VertexEntity.SubTtl(entityContext.entityId, value)
Behaviors.same
case VertexEntity.EchoValue =>
ctxLog("echo (logging only) value")
Behaviors.same
case _ =>
ctxLog("Unknown behaviour for mirror " + msg.toString)
Behaviors.same
}
}
override def applyIfReady(stepNum: SuperStep): Unit = {
if (neighbourCounter(stepNum) == partitionInDegree) {
val cmd = MirrorTotal(stepNum, summedTotal.get(stepNum))
val mainRef = sharding.entityRefFor(VertexEntity.TypeKey, main.toString())
mainRef ! cmd
}
}
}
object MirrorEntity {
val TypeKey: EntityTypeKey[VertexEntity.Command] =
EntityTypeKey[VertexEntity.Command]("MirrorEntity")
def apply(
nodeAddress: String,
entityContext: EntityContext[VertexEntity.Command]
): Behavior[VertexEntity.Command] = {
Behaviors.setup(ctx => {
ctx.setReceiveTimeout(30.seconds, VertexEntity.Idle)
new MirrorEntity(ctx, nodeAddress, entityContext)
})
}
}