diff --git a/core/build.sbt b/core/build.sbt index 15f432d2c8..2b7d3c1d7b 100644 --- a/core/build.sbt +++ b/core/build.sbt @@ -1,11 +1,17 @@ name := "core" + +//"com.typesafe.akka" %% "akka-agent" % "2.3.2", +//"com.typesafe.akka" %% "akka-cluster" % "2.3.2", +//"com.typesafe.akka" %% "akka-remote" % "2.3.2", libraryDependencies ++= Seq( "ch.qos.logback" % "logback-classic" % "1.1.2", "com.twitter" %% "chill" % "0.3.6" exclude("com.esotericsoftware.minlog", "minlog"), "com.typesafe" % "config" % "1.2.1", - "com.typesafe.akka" %% "akka-contrib" % "2.3.2", - "com.typesafe.akka" %% "akka-testkit" % "2.3.2", + "com.typesafe.akka" %% "akka-actor" % "2.3.2" % "provided", +"com.typesafe.akka" %% "akka-contrib" % "2.3.2" % "provided", +"com.typesafe.akka" %% "akka-testkit" % "2.3.2" % "provided", +"com.typesafe.akka" %% "akka-slf4j" % "2.3.2" % "provided", "commons-io" % "commons-io" % "2.4", "org.clapper" %% "grizzled-slf4j" % "1.0.2", "org.mongodb" %% "casbah" % "2.7.2", diff --git a/core/src/main/scala/core/BaseController.scala b/core/src/main/scala/core/BaseController.scala index 5fb3b2cb9d..37a1577561 100644 --- a/core/src/main/scala/core/BaseController.scala +++ b/core/src/main/scala/core/BaseController.scala @@ -1,6 +1,6 @@ package io.prediction.core -import scala.collection.Iterable +//import scala.collection.Iterable import scala.reflect.Manifest // FIXME(yipjustin). I am being lazy... @@ -67,10 +67,11 @@ abstract class BaseAlgorithm[ new PredictionSeq[F, P, BaseActual](data = output) } - def predictSpark[TD <: BaseModel]( - input: (Iterable[TD], Iterable[(BaseFeature, BaseActual)]) + //def predictSpark[M <: BaseModel]( + def predictSpark( + input: (Iterable[BaseModel], Iterable[(BaseFeature, BaseActual)]) ): Iterable[(BaseFeature, BasePrediction, BaseActual)] = { - val model = input._1.head.asInstanceOf[M] + val model: M = input._1.head.asInstanceOf[M] val validationSeq = input._2.map{ case(f, a) => { val ff = f.asInstanceOf[F] diff --git a/core/src/main/scala/core/BaseData.scala b/core/src/main/scala/core/BaseData.scala index 7eb23e55ad..f9c3c77f83 100644 --- a/core/src/main/scala/core/BaseData.scala +++ b/core/src/main/scala/core/BaseData.scala @@ -30,7 +30,8 @@ trait BaseParams extends Serializable { */ // Below are internal classes used by PIO workflow -trait BasePersistentData extends AnyRef {} +//trait BasePersistentData extends AnyRef {} +trait BasePersistentData extends Serializable {} trait BaseValidationSeq extends BasePersistentData {} diff --git a/engines/src/main/scala/itemrank/Data.scala b/engines/src/main/scala/itemrank/Data.scala index 0bb247e8f4..c2f6148d74 100644 --- a/engines/src/main/scala/itemrank/Data.scala +++ b/engines/src/main/scala/itemrank/Data.scala @@ -75,13 +75,13 @@ class ItemTD( val itypes: Seq[String], val starttime: Option[Long], val endtime: Option[Long], - val inactive: Boolean) + val inactive: Boolean) extends Serializable class RatingTD( val uindex: Int, val iindex: Int, val rating: Int, - val t: Long) + val t: Long) extends Serializable class TrainingData( val users: Map[Int, String], // uindex->uid diff --git a/engines/src/main/scala/itemrank/Engine.scala b/engines/src/main/scala/itemrank/Engine.scala index c71876f8ea..4fedb7e473 100644 --- a/engines/src/main/scala/itemrank/Engine.scala +++ b/engines/src/main/scala/itemrank/Engine.scala @@ -5,7 +5,9 @@ import io.prediction.EngineFactory import io.prediction.{ DefaultCleanser, DefaultServer } object ItemRankEngine extends EngineFactory { - override def apply(): AbstractEngine = { + //override def apply(): AbstractEngine = { + override def apply() + : BaseEngine[TrainingData,TrainingData,Feature,Prediction] = { new BaseEngine( classOf[DefaultCleanser[TrainingData]], Map("knn" -> classOf[KNNAlgorithm], diff --git a/engines/src/main/scala/itemrank/ItemRankEvaluator.scala b/engines/src/main/scala/itemrank/ItemRankEvaluator.scala index 25f366f5e7..81a1021a44 100644 --- a/engines/src/main/scala/itemrank/ItemRankEvaluator.scala +++ b/engines/src/main/scala/itemrank/ItemRankEvaluator.scala @@ -18,12 +18,24 @@ import scala.math.BigDecimal object ItemRankEvaluator extends EvaluatorFactory { val config = new Config - val usersDb = config.getAppdataUsers - val itemsDb = config.getAppdataItems - val u2iDb = config.getAppdataU2IActions - val itemSetsDb = config.getAppdataItemSets - - override def apply(): AbstractEvaluator = { + val usersDb = config.getAppdataUsers() + val itemsDb = config.getAppdataItems() + val u2iDb = config.getAppdataU2IActions() + val itemSetsDb = config.getAppdataItemSets() + + //override def apply(): AbstractEvaluator = { + override def apply() + : BaseEvaluator[EvalParams, + EvalParams, + TrainDataPrepParams, + ValidationDataPrepParams, + TrainingData, + Feature, + Prediction, + Actual, + ValidationUnit, + EmptyData, + EmptyData] = { new BaseEvaluator( classOf[ItemRankDataPreparator], classOf[ItemRankValidator]) @@ -42,10 +54,11 @@ class ItemRankDataPreparator final val CONFLICT_HIGHEST: String = "highest" final val CONFLICT_LOWEST: String = "lowest" - val usersDb = ItemRankEvaluator.usersDb - val itemsDb = ItemRankEvaluator.itemsDb - val u2iDb = ItemRankEvaluator.u2iDb - val itemSetsDb = ItemRankEvaluator.itemSetsDb + // Connection object makes the class not serializable. + //@transient val usersDb = ItemRankEvaluator.usersDb + //@transient val itemsDb = ItemRankEvaluator.itemsDb + //@transient val u2iDb = ItemRankEvaluator.u2iDb + //@transient val itemSetsDb = ItemRankEvaluator.itemSetsDb // Data generation override def getParamsSet(params: EvalParams) @@ -83,6 +96,11 @@ class ItemRankDataPreparator override def prepareTraining(params: TrainDataPrepParams): TrainingData = { + val usersDb = ItemRankEvaluator.usersDb + val itemsDb = ItemRankEvaluator.itemsDb + val u2iDb = ItemRankEvaluator.u2iDb + val itemSetsDb = ItemRankEvaluator.itemSetsDb + val usersMap: Map[String, Int] = usersDb.getByAppid(params.appid) .map(_.id).zipWithIndex .map { case (uid, index) => (uid, index + 1) }.toMap @@ -178,6 +196,10 @@ class ItemRankDataPreparator // TODO: use t to generate eval data override def prepareValidation(params: ValidationDataPrepParams): Seq[(Feature, Actual)] = { + val usersDb = ItemRankEvaluator.usersDb + val itemsDb = ItemRankEvaluator.itemsDb + val u2iDb = ItemRankEvaluator.u2iDb + val itemSetsDb = ItemRankEvaluator.itemSetsDb val usersMap: Map[String, Int] = usersDb.getByAppid(params.appid) .map(_.id).zipWithIndex diff --git a/engines/src/main/scala/stock/Data.scala b/engines/src/main/scala/stock/Data.scala index c7e17fc4cb..0f9328be60 100644 --- a/engines/src/main/scala/stock/Data.scala +++ b/engines/src/main/scala/stock/Data.scala @@ -18,6 +18,7 @@ import org.saddle._ import org.saddle.index.IndexTime import com.github.nscala_time.time.Imports._ import breeze.linalg.{ DenseMatrix, DenseVector } +import com.twitter.chill.MeatLocker // Use data after baseData. // Afterwards, slicing uses idx @@ -57,6 +58,11 @@ class TrainingData( val lastDate = price.rowIx.last.get s"TrainingData $firstDate $lastDate" } + /* + private def writeObject(oos: ObjectOutputStream): Unit = { + + } + */ } class Model( @@ -65,10 +71,13 @@ class Model( class Feature( // This is different from TrainingData. This serves as input for algorithm. // Hence, the time series should be shorter than that of TrainingData. - val data: Frame[DateTime, String, Double]) extends BaseFeature { + //val data: Frame[DateTime, String, Double]) extends BaseFeature { + val boxedData: MeatLocker[Frame[DateTime, String, Double]]) extends BaseFeature { override def toString(): String = { - val firstDate = data.rowIx.first.get - val lastDate = data.rowIx.last.get + //val firstDate = data.rowIx.first.get + //val lastDate = data.rowIx.last.get + val firstDate = boxedData.get.rowIx.first.get + val lastDate = boxedData.get.rowIx.last.get s"Feature $firstDate $lastDate" } } diff --git a/engines/src/main/scala/stock/Evaluator.scala b/engines/src/main/scala/stock/Evaluator.scala index ba38508cb8..b1c7aa6232 100644 --- a/engines/src/main/scala/stock/Evaluator.scala +++ b/engines/src/main/scala/stock/Evaluator.scala @@ -26,6 +26,7 @@ import breeze.linalg.{ DenseMatrix, DenseVector } import breeze.stats.{ mean, meanAndVariance } import nak.regress.LinearRegression import scala.collection.mutable.ArrayBuffer +import com.twitter.chill.MeatLocker object StockEvaluator extends EvaluatorFactory { // Use singleton class here to avoid re-registering hooks in config. @@ -218,7 +219,10 @@ class StockDataPreparator val featureData = Frame(data.map(e => (e._1, e._2)): _*) val targetData = data.map(e => (e._1, e._3)).toMap - return (new Feature(data = featureData), new Target(data = targetData)) + //return (new Feature(data = featureData), new Target(data = targetData)) + return (new Feature( + boxedData = MeatLocker(featureData)), + new Target(data = targetData)) } def prepareValidation(params: ValidationDataParams) diff --git a/engines/src/main/scala/stock/RandomAlgorithm.scala b/engines/src/main/scala/stock/RandomAlgorithm.scala index d347070a21..2754855603 100644 --- a/engines/src/main/scala/stock/RandomAlgorithm.scala +++ b/engines/src/main/scala/stock/RandomAlgorithm.scala @@ -29,7 +29,8 @@ class RandomAlgorithm } def predict(model: EmptyModel, feature: Feature): Target = { - val tickers = feature.data.colIx.toVec.contents + //val tickers = feature.data.colIx.toVec.contents + val tickers = feature.boxedData.get.colIx.toVec.contents val prediction = tickers.map { ticker => (ticker, _drift + _random.nextGaussian() * _scale) }.toMap diff --git a/engines/src/main/scala/stock/RegressionAlgorithm.scala b/engines/src/main/scala/stock/RegressionAlgorithm.scala index cbcd9412ff..e0089d3b02 100644 --- a/engines/src/main/scala/stock/RegressionAlgorithm.scala +++ b/engines/src/main/scala/stock/RegressionAlgorithm.scala @@ -85,7 +85,8 @@ class RegressionAlgorithm } def predict(model: Model, feature: Feature): Target = { - val price: Frame[DateTime, String, Double] = feature.data + //val price: Frame[DateTime, String, Double] = feature.data + val price: Frame[DateTime, String, Double] = feature.boxedData.get val modelData = model.data val prediction = price.colIx.toVec.contents // If model doesn't have the data, skip diff --git a/spark/build.sbt b/spark/build.sbt index 7ea526f777..dfb6c5e624 100644 --- a/spark/build.sbt +++ b/spark/build.sbt @@ -1,29 +1,44 @@ -name := "Imagine Spark" +import AssemblyKeys._ -version := "0.8" +assemblySettings -scalaVersion := "2.10.4" - -libraryDependencies += "org.apache.spark" %% "spark-core" % "1.0.0" +name := "pio-spark" libraryDependencies ++= Seq( - "ch.qos.logback" % "logback-classic" % "1.1.2", - "com.twitter" %% "chill" % "0.3.6", + "org.clapper" %% "grizzled-slf4j" % "1.0.2", + "org.scala-saddle" %% "saddle-core" % "1.3.2", + "org.scalanlp" %% "breeze" % "0.7", + "org.scalanlp" %% "breeze-natives" % "0.7", + "org.scalanlp" % "nak" % "1.2.1", + "org.json4s" %% "json4s-native" % "3.2.9", + "org.apache.spark" %% "spark-core" % "1.0.0" % "provided", "com.typesafe" % "config" % "1.2.1", "com.typesafe.akka" %% "akka-contrib" % "2.3.2", "com.typesafe.akka" %% "akka-testkit" % "2.3.2", - "commons-io" % "commons-io" % "2.4", "org.clapper" %% "grizzled-slf4j" % "1.0.2", - "org.mongodb" %% "casbah" % "2.7.2", - "org.scalatest" %% "scalatest" % "2.1.6" % "test", - "org.json4s" %% "json4s-native" % "3.2.9", - "org.json4s" %% "json4s-ext" % "3.2.7") - -libraryDependencies += "com.github.nscala-time" %% "nscala-time" % "1.2.0" + "com.github.scopt" %% "scopt" % "3.2.0") + +// "org.apache.spark" %% "spark-core" % "1.0.0" % "provided", resolvers += "Akka Repository" at "http://repo.akka.io/releases/" resolvers += Resolver.url( "Typesafe Releases", - url("http://repo.typesafe.com/typesafe/ivy-releases"))( - Resolver.ivyStylePatterns) + url("http://repo.typesafe.com/typesafe/ivy-releases"))( + Resolver.ivyStylePatterns) + + +addCompilerPlugin("org.scala-sbt.sxr" %% "sxr" % "0.3.0") + +scalacOptions <<= (scalacOptions, scalaSource in Compile) map { (options, base) => + options :+ ("-P:sxr:base-directory:" + base.getAbsolutePath) +} + +run in Compile <<= Defaults.runTask(fullClasspath in Compile, mainClass in (Compile, run), runner in (Compile, run)) + +mergeStrategy in assembly <<= (mergeStrategy in assembly) { (old) => + { + case PathList("scala", xs @ _*) => MergeStrategy.discard + case x => old(x) + } +} diff --git a/spark/src/main/scala/SimpleApp.scala b/spark/src/main/scala/SimpleApp.scala index 3aa891623e..4cb1778f6c 100644 --- a/spark/src/main/scala/SimpleApp.scala +++ b/spark/src/main/scala/SimpleApp.scala @@ -1,8 +1,16 @@ +import org.apache.spark.SparkContext +import org.apache.spark.SparkContext._ +import org.apache.spark.SparkConf + + import io.prediction.engines.stock.LocalFileStockEvaluator import io.prediction.engines.stock.StockEvaluator import io.prediction.engines.stock.EvaluationDataParams import io.prediction.engines.stock.RandomAlgoParams import io.prediction.engines.stock.StockEngine +import io.prediction.engines.stock.Feature + +import io.prediction.engines.itemrank._ import io.prediction.core.BaseEvaluator import io.prediction.core.BaseEngine @@ -12,15 +20,22 @@ import io.prediction.core.AbstractDataPreparator import com.github.nscala_time.time.Imports.DateTime -import org.apache.spark.SparkContext -import org.apache.spark.SparkContext._ -import org.apache.spark.SparkConf +import org.apache.spark.serializer.KryoRegistrator +import org.apache.spark.serializer.KryoSerializer import java.io.FileOutputStream import java.io.ObjectOutputStream import java.io.FileInputStream import java.io.ObjectInputStream +//import org.saddle.Series +import org.saddle._ + +import com.twitter.chill.MeatLocker + +//import com.twitter.summingbird.online.Externalizer +import com.twitter.chill.Externalizer +import com.esotericsoftware.kryo.Kryo object WS { val tmpDir = "/tmp/pio/" @@ -44,10 +59,23 @@ object WS { } } +class R extends KryoRegistrator { + override def registerClasses(kryo: Kryo) { + println("XXXXXXXXXX") + //kryo.register(classOf[Series[Int, Int]]) + //kryo.register(classOf[org.saddle.Series[_, _]]) + kryo.register(classOf[X]) + } +} + +class X(val x : Int) {} + object SimpleApp { def simple() { val logFile = "/home/yipjustin/client/spark/spark-1.0.0-bin-hadoop2/README.md" val conf = new SparkConf().setAppName("Simple Application") + + val sc = new SparkContext(conf) val logData = sc.textFile(logFile, 2).cache() val numAs = logData.filter(line => line.contains("a")).count() @@ -57,23 +85,42 @@ object SimpleApp { } def test() { + val conf = new SparkConf().setAppName("PredictionIO") + //conf.set("spark.serializer", classOf[KryoSerializer].getName) + /* + conf.set("spark.serializer", + "org.apache.spark.serializer.KryoSerializer") + conf.set("spark.kryo.registrator", classOf[R].getName) + */ + + val sc = new SparkContext(conf) + //sc.addJar("/home/yipjustin/.ivy2/cache/org.scala-saddle/saddle-core_2.10/jars/saddle-core_2.10-1.3.2.jar") + + val d = sc.parallelize(Array(1,2,3)) + + //val e = d.map(i => Series(i)).map(e => Externalizer[Series[Int, Int]](e)) + val e = d.map(i => Series(i)).map(s => { + val f = Frame("a" -> s) + MeatLocker[Frame[Int, String, Int]](f) + }) + + e.collect.foreach(e => println(e.get)) + + + /* val s = StockEvaluator() val dataPrep = s.dataPreparatorClass.newInstance val fn = WS.saveData(42, dataPrep) val dp = WS.loadData[AbstractDataPreparator](fn) + */ + //val f: Series[Int, Int] = Series(1,2,3,4,5,6) } - - - def main(args: Array[String]) { - //test - //return - + def stock() { val s = StockEvaluator() - //val e = StockEngine.get val e = StockEngine() val tickerList = Seq("GOOG", "AAPL", "FB", "GOOGL", "MSFT") @@ -89,68 +136,63 @@ object SimpleApp { println(evalDataParams) val randomAlgoParams = new RandomAlgoParams(seed = 1, scale = 0.01) - SparkWorkflow.run("Fizz", + //evalDataParams, evalDataParams, + null, /* validation params */ //null, /* algo params */ randomAlgoParams, s, e) - return - - - - val randomAlgo = e.algorithmClassMap("random").newInstance - - //val logFile = "YOUR_SPARK_HOME/README.md" // Should be some file on your system - - - //val logFile = "/Users/yipjustin/data/spark/README.md" - val conf = new SparkConf().setAppName("PredictionIO") - val sc = new SparkContext(conf) - - /* - val logData = sc.textFile(logFile, 2).cache() - val numAs = logData.filter(line => line.contains("a")).count() - val numBs = logData.filter(line => line.contains("b")).count() - println("Lines with a: %s, Lines with b: %s".format(numAs, numBs)) - */ - //val s = LocalFileStockEvaluator() - //val s = StockEvaluator() - - - - - val dataPrep = s.dataPreparatorClass.newInstance - - val localParamsSet = dataPrep.getParamsSetBase(evalDataParams) - - val sparkParamsSet = sc.parallelize(localParamsSet) - - sparkParamsSet.foreach(println) - - val sparkTrainingSet = sparkParamsSet.map(_._1).map(dataPrep.prepareTrainingBase) - - println("spark training set") - println(sparkTrainingSet.first) - sparkTrainingSet.foreach(println) - + } - //randomAlgo.initBase(randomAlgoParams) - //val sparkModel = sparkTrainingSet.map(randomAlgo.trainBase) + def itemrank() { + val s = ItemRankEvaluator() + val e = ItemRankEngine() + val evalParams = new EvalParams( + appid = 1, + itypes = None, + actions = Map( + "view" -> Some(3), + "like" -> Some(5), + "conversion" -> Some(4), + "rate" -> None + ), + conflict = "latest", + //recommendationTime = 123456, + seenActions = Some(Set("conversion")), + //(int years, int months, int weeks, int days, int hours, + // int minutes, int seconds, int millis) + hours = 24,//new Period(0, 0, 0, 1, 0, 0, 0, 0), + trainStart = new DateTime("2014-04-01T00:00:00.000"), + testStart = new DateTime("2014-04-20T00:00:00.000"), + testUntil = new DateTime("2014-04-30T00:00:00.000"), + goal = Set("conversion", "view") + ) + val knnAlgoParams = new KNNAlgoParams(similarity="consine") - //println("spark model") + SparkWorkflow.run("Fizz", + //evalDataParams, + evalParams, + evalParams, /* validation params */ + //null, /* algo params */ + //randomAlgoParams, + knnAlgoParams, + s, + e) + return + } - //sparkModel.foreach(println) - //println(sparkModel.first) - //val localModel = sparkModel.collect - //val localModel = sparkModel.take(3) + def main(args: Array[String]) { + //itemrank + //stock + test + //return - //localModel.foreach(println) - //sparkModel.persist - //sparkModel.saveAsObjectFile("/tmp/pio/obj") + + } } diff --git a/spark/src/main/scala/SparkWorkflow.scala b/spark/src/main/scala/SparkWorkflow.scala index 43d7dd1732..98e7dab786 100644 --- a/spark/src/main/scala/SparkWorkflow.scala +++ b/spark/src/main/scala/SparkWorkflow.scala @@ -31,6 +31,9 @@ import org.apache.spark.rdd.RDD import scala.reflect.Manifest +import org.saddle._ +import com.twitter.chill.Externalizer + object SparkWorkflow { def run[ EDP <: BaseEvaluationDataParams : Manifest, @@ -47,12 +50,14 @@ object SparkWorkflow { CVR <: BaseCrossValidationResults : Manifest]( batch: String, evalDataParams: BaseEvaluationDataParams, + validationParams: BaseValidationParams, algoParams: BaseAlgoParams, baseEvaluator : BaseEvaluator[EDP,VP,TDP,VDP,TD,F,P,A,VU,VR,CVR], baseEngine: BaseEngine[TD,CD,F,P]): Unit = { val conf = new SparkConf().setAppName(s"PredictionIO: $batch") + conf.set("spark.local.dir", "/home/yipjustin/tmp/spark") val sc = new SparkContext(conf) val dataPrep = baseEvaluator.dataPreparatorBaseClass.newInstance @@ -78,19 +83,19 @@ object SparkWorkflow { val trainingDataMap: RDD[(Int, TD)] = trainingParamsMap.mapValues(dataPrep.prepareTrainingBase) - trainingDataMap.foreach(println) + //trainingDataMap.collect.foreach(println) // Prepare Validation Data val validationDataMap: RDD[(Int, (F, A))] = validationParamsMap.flatMapValues(dataPrep.prepareValidationSpark) - validationDataMap.foreach(println) + validationDataMap.collect.foreach(println) // TODO: Cleanse Data // Model Training - //val algo = baseEngine.algorithmBaseClassMap("regression").newInstance - val algo = baseEngine.algorithmBaseClassMap("random").newInstance + val algo = baseEngine.algorithmBaseClassMap("regression").newInstance + //val algo = baseEngine.algorithmBaseClassMap("knn").newInstance algo.initBase(algoParams) val modelMap: RDD[(Int, BaseModel)] = @@ -100,12 +105,21 @@ object SparkWorkflow { // Prediction val modelValidationMap: RDD[(Int, (Iterable[BaseModel], Iterable[(F,A)]))] = - trainingDataMap.cogroup(validationDataMap) + modelMap.cogroup(validationDataMap) - val predictionMap: RDD[(Int, Iterable[(F, A, P)])] = + val predictionMap + : RDD[(Int, Iterable[(BaseFeature, BasePrediction, BaseActual)])] = modelValidationMap.mapValues(algo.predictSpark) - predictionMap.foreach(println) + predictionMap.collect.foreach(println) + + // Validation + /* + val validator = baseEvaluator.validatorBaseClass.newInstance + validator.initBase(validationParams) + + val validationUnitMap: RDD[(Int, BaseValidationUnit)] + */ //val trainingSet = trainingParamsSet.map(dataPrep.prepareTrainingBase)