Skip to content

Commit

Permalink
Replace deprecated Enumerator.imperative with Concurrent.broadcast
Browse files Browse the repository at this point in the history
  • Loading branch information
ornicar committed Jun 3, 2012
1 parent bbefe1c commit e1ae333
Show file tree
Hide file tree
Showing 15 changed files with 42 additions and 34 deletions.
4 changes: 2 additions & 2 deletions app/lobby/Hub.scala
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,9 @@ final class Hub(
case WithHooks(op) op(hookOwnerIds).unsafePerformIO

case Join(uid, username, version, hookOwnerId) {
val channel = Enumerator.imperative[JsValue]()
val (enumerator, channel) = Concurrent.broadcast[JsValue]
addMember(uid, Member(channel, username, hookOwnerId))
sender ! Connected(channel)
sender ! Connected(enumerator, channel)
}

case Talk(txt, u) messenger(txt, u).unsafePerformIO foreach { message
Expand Down
4 changes: 2 additions & 2 deletions app/lobby/Socket.scala
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ final class Socket(hub: ActorRef) {
version versionOption
uid uidOption
} yield (hub ? Join(uid, username, version, hook)).asPromise map {
case Connected(channel)
case Connected(enumerator, channel)
val iteratee = Iteratee.foreach[JsValue] { e
e str "t" match {
case Some("talk") for {
Expand All @@ -44,7 +44,7 @@ final class Socket(hub: ActorRef) {
} mapDone { _
hub ! Quit(uid)
}
(iteratee, channel)
(iteratee, enumerator)
}: SocketPromise
promise | Util.connectionFail
}
Expand Down
6 changes: 4 additions & 2 deletions app/lobby/actorApi.scala
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import game.DbGame
import scalaz.effects.IO

case class Member(
channel: Channel,
channel: JsChannel,
username: Option[String],
hookOwnerId: Option[String]) extends SocketMember {

Expand All @@ -26,4 +26,6 @@ case class Join(
version: Int,
hookOwnerId: Option[String])
case class Talk(txt: String, u: String)
case class Connected(channel: Channel)
case class Connected(
enumerator: JsEnumerator,
channel: JsChannel)
4 changes: 2 additions & 2 deletions app/monitor/Hub.scala
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,9 @@ final class Hub(timeout: Int) extends HubActor[Member](timeout) {
def receiveSpecific = {

case Join(uid) {
val channel = Enumerator.imperative[JsValue]()
val (enumerator, channel) = Concurrent.broadcast[JsValue]
addMember(uid, Member(channel))
sender ! Connected(channel)
sender ! Connected(enumerator, channel)
}

case MonitorData(data) notifyAll("monitor", JsString(data mkString ";"))
Expand Down
4 changes: 2 additions & 2 deletions app/monitor/Socket.scala
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ final class Socket(hub: ActorRef) {
val promise: Option[SocketPromise] = for {
uid uidOption
} yield (hub ? Join(uid)).asPromise map {
case Connected(channel)
case Connected(enumerator, channel)
val iteratee = Iteratee.foreach[JsValue] { e
e str "t" match {
case Some("p") hub ! Ping(uid)
Expand All @@ -31,7 +31,7 @@ final class Socket(hub: ActorRef) {
} mapDone { _
hub ! Quit(uid)
}
(iteratee, channel)
(iteratee, enumerator)
}
promise | Util.connectionFail
}
Expand Down
6 changes: 4 additions & 2 deletions app/monitor/actorApi.scala
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,12 @@ case object GetMonitorData

case class Update(env: CoreEnv)

case class Member(channel: Channel) extends SocketMember {
case class Member(channel: JsChannel) extends SocketMember {
val username = none
}

case class Join(uid: String)
case class Connected(channel: Channel)
case class Connected(
enumerator: JsEnumerator,
channel: JsChannel)
case class MonitorData(data: List[String])
10 changes: 5 additions & 5 deletions app/package.scala
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
import ornicar.scalalib._

import play.api.libs.json.JsValue
import play.api.libs.iteratee.{ Iteratee, Enumerator }
import play.api.libs.concurrent.Promise
import play.api.libs.iteratee.PushEnumerator
import play.api.libs.iteratee.{ Iteratee, Enumerator }
import play.api.libs.iteratee.Concurrent.Channel

import com.novus.salat._
import com.mongodb.casbah.commons.conversions.scala.RegisterJodaTimeConversionHelpers
Expand All @@ -17,9 +17,9 @@ package object lila
with scalaz.Lists
with scalaz.Booleans {

type Channel = PushEnumerator[JsValue]

type SocketPromise = Promise[(Iteratee[JsValue, _], Enumerator[JsValue])]
type JsChannel = Channel[JsValue]
type JsEnumerator = Enumerator[JsValue]
type SocketPromise = Promise[(Iteratee[JsValue, _], JsEnumerator)]

// custom salat context
implicit val customSalatContext = new Context {
Expand Down
6 changes: 3 additions & 3 deletions app/round/Hub.scala
Original file line number Diff line number Diff line change
Expand Up @@ -56,13 +56,13 @@ final class Hub(
case IsGone(_, color) sender ! playerIsGone(color)

case Join(uid, username, version, color, owner) {
val channel = Enumerator.imperative[JsValue]()
val (enumerator, channel) = Concurrent.broadcast[JsValue]
val member = Member(channel, username, PovRef(gameId, color), owner)
addMember(uid, member)
notify(crowdEvent :: Nil)
if (playerIsGone(color)) notifyGone(color, false)
playerTime(color, nowMillis)
sender ! Connected(member)
sender ! Connected(enumerator, member)
}

case Events(events) notify(events)
Expand All @@ -74,7 +74,7 @@ final class Hub(
}

case Close {
members.values foreach { _.channel.close() }
members.values foreach { _.channel.end() }
self ! PoisonPill
}
}
Expand Down
4 changes: 2 additions & 2 deletions app/round/Socket.scala
Original file line number Diff line number Diff line change
Expand Up @@ -118,13 +118,13 @@ final class Socket(
color = pov.color,
owner = owner
) map {
case Connected(member) (
case Connected(enumerator, member) (
Iteratee.foreach[JsValue](
controller(hub, uid, member, PovRef(pov.gameId, member.color))
) mapDone { _
hub ! Quit(uid)
},
member.channel)
enumerator)
}
} yield socket).asPromise: SocketPromise
}) | connectionFail
Expand Down
10 changes: 6 additions & 4 deletions app/round/actorApi.scala
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ sealed trait Member extends SocketMember {

object Member {
def apply(
channel: Channel,
channel: JsChannel,
username: Option[String],
ref: PovRef,
owner: Boolean): Member =
Expand All @@ -31,15 +31,15 @@ object Member {
}

case class Owner(
channel: Channel,
channel: JsChannel,
username: Option[String],
ref: PovRef) extends Member {

val owner = true
}

case class Watcher(
channel: Channel,
channel: JsChannel,
username: Option[String],
ref: PovRef) extends Member {

Expand All @@ -52,7 +52,9 @@ case class Join(
version: Int,
color: Color,
owner: Boolean)
case class Connected(member: Member)
case class Connected(
enumerator: JsEnumerator,
member: Member)
case class Events(events: List[Event])
case class GameEvents(gameId: String, events: List[Event])
case class GetGameVersion(gameId: String)
Expand Down
4 changes: 2 additions & 2 deletions app/site/Hub.scala
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,9 @@ final class Hub(timeout: Int) extends HubActor[Member](timeout) {
def receiveSpecific = {

case Join(uid, username) {
val channel = Enumerator.imperative[JsValue]()
val (enumerator, channel) = Concurrent.broadcast[JsValue]
addMember(uid, Member(channel, username))
sender ! Connected(channel)
sender ! Connected(enumerator, channel)
}
}
}
4 changes: 2 additions & 2 deletions app/site/Socket.scala
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ final class Socket(hub: ActorRef) {
val promise: Option[SocketPromise] = for {
uid uidOption
} yield (hub ? Join(uid, username)).asPromise map {
case Connected(channel)
case Connected(enumerator, channel)
val iteratee = Iteratee.foreach[JsValue] { e
e str "t" match {
case Some("p") hub ! Ping(uid)
Expand All @@ -32,7 +32,7 @@ final class Socket(hub: ActorRef) {
} mapDone { _
hub ! Quit(uid)
}
(iteratee, channel)
(iteratee, enumerator)
}
promise | Util.connectionFail
}
Expand Down
6 changes: 4 additions & 2 deletions app/site/actorApi.scala
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,12 @@ package site
import socket.SocketMember

case class Member(
channel: Channel,
channel: JsChannel,
username: Option[String]) extends SocketMember

case class Join(
uid: String,
username: Option[String])
case class Connected(channel: Channel)
case class Connected(
enumerator: JsEnumerator,
channel: JsChannel)
2 changes: 1 addition & 1 deletion app/socket/HubActor.scala
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ abstract class HubActor[M <: SocketMember](uidTimeout: Int) extends Actor {

def eject(uid: String) {
members get uid foreach { member
member.channel.close()
member.channel.end()
quit(uid)
}
}
Expand Down
2 changes: 1 addition & 1 deletion app/socket/actorApi.scala
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ package socket
import play.api.libs.json.JsObject

trait SocketMember {
val channel: Channel
val channel: JsChannel
val username: Option[String]

lazy val userId: Option[String] = username map (_.toLowerCase)
Expand Down

0 comments on commit e1ae333

Please sign in to comment.