Skip to content

Commit

Permalink
Add support for serializing synchronized collections
Browse files Browse the repository at this point in the history
  • Loading branch information
detrevid committed Mar 22, 2015
1 parent cb7e3bc commit 43adc66
Show file tree
Hide file tree
Showing 4 changed files with 54 additions and 45 deletions.
37 changes: 19 additions & 18 deletions core/build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -15,25 +15,26 @@
name := "core"

libraryDependencies ++= Seq(
"com.github.scopt" %% "scopt" % "3.2.0",
"com.google.code.gson" % "gson" % "2.2.4",
"com.google.guava" % "guava" % "18.0",
"com.twitter" %% "chill" % "0.5.0"
"com.github.scopt" %% "scopt" % "3.2.0",
"com.google.code.gson" % "gson" % "2.2.4",
"com.google.guava" % "guava" % "18.0",
"com.twitter" %% "chill" % "0.5.0"
exclude("com.esotericsoftware.minlog", "minlog"),
"com.twitter" %% "chill-bijection" % "0.5.0",
"commons-io" % "commons-io" % "2.4",
"io.spray" %% "spray-can" % "1.3.2",
"io.spray" %% "spray-routing" % "1.3.2",
"net.jodah" % "typetools" % "0.3.1",
"org.apache.spark" %% "spark-core" % sparkVersion.value % "provided",
"org.clapper" %% "grizzled-slf4j" % "1.0.2",
"org.elasticsearch" % "elasticsearch" % elasticsearchVersion.value,
"org.json4s" %% "json4s-native" % json4sVersion.value,
"org.json4s" %% "json4s-ext" % json4sVersion.value,
"org.scalaj" %% "scalaj-http" % "1.1.0",
"org.scalatest" %% "scalatest" % "2.1.6" % "test",
"org.slf4j" % "slf4j-log4j12" % "1.7.7",
"org.specs2" %% "specs2" % "2.3.13" % "test")
"com.twitter" %% "chill-bijection" % "0.5.0",
"de.javakaffee" % "kryo-serializers" % "0.28",
"commons-io" % "commons-io" % "2.4",
"io.spray" %% "spray-can" % "1.3.2",
"io.spray" %% "spray-routing" % "1.3.2",
"net.jodah" % "typetools" % "0.3.1",
"org.apache.spark" %% "spark-core" % sparkVersion.value % "provided",
"org.clapper" %% "grizzled-slf4j" % "1.0.2",
"org.elasticsearch" % "elasticsearch" % elasticsearchVersion.value,
"org.json4s" %% "json4s-native" % json4sVersion.value,
"org.json4s" %% "json4s-ext" % json4sVersion.value,
"org.scalaj" %% "scalaj-http" % "1.1.0",
"org.scalatest" %% "scalatest" % "2.1.6" % "test",
"org.slf4j" % "slf4j-log4j12" % "1.7.7",
"org.specs2" %% "specs2" % "2.3.13" % "test")

net.virtualvoid.sbt.graph.Plugin.graphSettings

Expand Down
13 changes: 5 additions & 8 deletions core/src/main/scala/io/prediction/controller/Utils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ package io.prediction.controller

import io.prediction.workflow.KryoInstantiator

import com.twitter.chill.KryoInjection
import org.json4s._
import org.json4s.ext.JodaTimeSerializers

Expand All @@ -42,12 +41,12 @@ object Utils {
* @param model Model object.
*/
def save(id: String, model: Any): Unit = {
val tmpdir = sys.env.get("PIO_FS_TMPDIR").getOrElse(
System.getProperty("java.io.tmpdir"))
val tmpdir = sys.env.getOrElse("PIO_FS_TMPDIR", System.getProperty("java.io.tmpdir"))
val modelFile = tmpdir + File.separator + id
(new File(tmpdir)).mkdirs
val fos = new FileOutputStream(modelFile)
fos.write(KryoInjection(model))
val kryo = KryoInstantiator.newKryoInjection
fos.write(kryo(model))
fos.close
}

Expand All @@ -59,12 +58,10 @@ object Utils {
* @param id Used as the filename of the file.
*/
def load(id: String): Any = {
val tmpdir = sys.env.get("PIO_FS_TMPDIR").getOrElse(
System.getProperty("java.io.tmpdir"))
val tmpdir = sys.env.getOrElse("PIO_FS_TMPDIR", System.getProperty("java.io.tmpdir"))
val modelFile = tmpdir + File.separator + id
val src = Source.fromFile(modelFile)(scala.io.Codec.ISO8859)
val kryoInstantiator = new KryoInstantiator(getClass.getClassLoader)
val kryo = KryoInjection.instance(kryoInstantiator)
val kryo = KryoInstantiator.newKryoInjection
val m = kryo.invert(src.map(_.toByte).toArray).get
src.close
m
Expand Down
10 changes: 6 additions & 4 deletions core/src/main/scala/io/prediction/workflow/CoreWorkflow.scala
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,6 @@

package io.prediction.workflow

import com.github.nscala_time.time.Imports.DateTime
import com.twitter.chill.KryoInjection
import grizzled.slf4j.Logger
import io.prediction.controller.EngineParams
import io.prediction.controller.Evaluation
import io.prediction.controller.WorkflowParams
Expand All @@ -29,6 +26,9 @@ import io.prediction.data.storage.EvaluationInstance
import io.prediction.data.storage.Model
import io.prediction.data.storage.Storage

import com.github.nscala_time.time.Imports.DateTime
import grizzled.slf4j.Logger

import scala.language.existentials

/** CoreWorkflow handles PredictionIO metadata and environment variables of
Expand Down Expand Up @@ -67,10 +67,12 @@ object CoreWorkflow {

val instanceId = Storage.getMetaDataEngineInstances

val kryo = KryoInstantiator.newKryoInjection

logger.info("Inserting persistent model")
Storage.getModelDataModels.insert(Model(
id = engineInstance.id,
models = KryoInjection(models)))
models = kryo(models)))

logger.info("Updating engine instance")
val engineInstances = Storage.getMetaDataEngineInstances
Expand Down
39 changes: 24 additions & 15 deletions core/src/main/scala/io/prediction/workflow/CreateServer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -15,17 +15,6 @@

package io.prediction.workflow

import akka.actor._
import akka.event.Logging
import akka.io.IO
import akka.pattern.ask
import akka.util.Timeout
import com.github.nscala_time.time.Imports.DateTime
import com.google.gson.Gson
import com.twitter.chill.KryoBase
import com.twitter.chill.KryoInjection
import com.twitter.chill.ScalaKryoInstantiator
import grizzled.slf4j.Logging
import io.prediction.controller.Engine
import io.prediction.controller.Params
import io.prediction.controller.Utils
Expand All @@ -40,6 +29,20 @@ import io.prediction.core.Doer
import io.prediction.data.storage.EngineInstance
import io.prediction.data.storage.EngineManifest
import io.prediction.data.storage.Storage

import akka.actor._
import akka.event.Logging
import akka.io.IO
import akka.pattern.ask
import akka.util.Timeout
import com.github.nscala_time.time.Imports.DateTime
import com.google.gson.Gson
import com.twitter.bijection.Injection
import com.twitter.chill.KryoBase
import com.twitter.chill.KryoInjection
import com.twitter.chill.ScalaKryoInstantiator
import de.javakaffee.kryoserializers.SynchronizedCollectionsSerializer
import grizzled.slf4j.Logging
import org.json4s._
import org.json4s.native.JsonMethods._
import org.json4s.native.Serialization.write
Expand All @@ -57,17 +60,24 @@ import scala.util.Failure
import scala.util.Random
import scala.util.Success

import java.io.PrintWriter
import java.io.StringWriter
import java.io.{Serializable, PrintWriter, StringWriter}

class KryoInstantiator(classLoader: ClassLoader) extends ScalaKryoInstantiator {
override def newKryo(): KryoBase = {
val kryo = super.newKryo()
kryo.setClassLoader(classLoader)
SynchronizedCollectionsSerializer.registerSerializers(kryo)
kryo
}
}

object KryoInstantiator extends Serializable {
def newKryoInjection : Injection[Any, Array[Byte]] = {
val kryoInstantiator = new KryoInstantiator(getClass.getClassLoader)
KryoInjection.instance(kryoInstantiator)
}
}

case class ServerConfig(
batch: String = "",
engineInstanceId: String = "",
Expand Down Expand Up @@ -190,8 +200,7 @@ object CreateServer extends Logging {

val engineParams = engine.engineInstanceToEngineParams(engineInstance)

val kryoInstantiator = new KryoInstantiator(getClass.getClassLoader)
val kryo = KryoInjection.instance(kryoInstantiator)
val kryo = KryoInstantiator.newKryoInjection

val modelsFromEngineInstance =
kryo.invert(modeldata.get(engineInstance.id).get.models).get.
Expand Down

0 comments on commit 43adc66

Please sign in to comment.