Skip to content

Commit

Permalink
Working ItemRank.
Browse files Browse the repository at this point in the history
  • Loading branch information
Justin Yip committed Jun 7, 2014
1 parent 940a050 commit da24d7b
Show file tree
Hide file tree
Showing 13 changed files with 227 additions and 109 deletions.
10 changes: 8 additions & 2 deletions core/build.sbt
Original file line number Diff line number Diff line change
@@ -1,11 +1,17 @@
name := "core"

//"com.typesafe.akka" %% "akka-agent" % "2.3.2",
//"com.typesafe.akka" %% "akka-cluster" % "2.3.2",
//"com.typesafe.akka" %% "akka-remote" % "2.3.2",

libraryDependencies ++= Seq(
"ch.qos.logback" % "logback-classic" % "1.1.2",
"com.twitter" %% "chill" % "0.3.6" exclude("com.esotericsoftware.minlog", "minlog"),
"com.typesafe" % "config" % "1.2.1",
"com.typesafe.akka" %% "akka-contrib" % "2.3.2",
"com.typesafe.akka" %% "akka-testkit" % "2.3.2",
"com.typesafe.akka" %% "akka-actor" % "2.3.2" % "provided",
"com.typesafe.akka" %% "akka-contrib" % "2.3.2" % "provided",
"com.typesafe.akka" %% "akka-testkit" % "2.3.2" % "provided",
"com.typesafe.akka" %% "akka-slf4j" % "2.3.2" % "provided",
"commons-io" % "commons-io" % "2.4",
"org.clapper" %% "grizzled-slf4j" % "1.0.2",
"org.mongodb" %% "casbah" % "2.7.2",
Expand Down
9 changes: 5 additions & 4 deletions core/src/main/scala/core/BaseController.scala
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
package io.prediction.core

import scala.collection.Iterable
//import scala.collection.Iterable
import scala.reflect.Manifest

// FIXME(yipjustin). I am being lazy...
Expand Down Expand Up @@ -67,10 +67,11 @@ abstract class BaseAlgorithm[
new PredictionSeq[F, P, BaseActual](data = output)
}

def predictSpark[TD <: BaseModel](
input: (Iterable[TD], Iterable[(BaseFeature, BaseActual)])
//def predictSpark[M <: BaseModel](
def predictSpark(
input: (Iterable[BaseModel], Iterable[(BaseFeature, BaseActual)])
): Iterable[(BaseFeature, BasePrediction, BaseActual)] = {
val model = input._1.head.asInstanceOf[M]
val model: M = input._1.head.asInstanceOf[M]

val validationSeq = input._2.map{ case(f, a) => {
val ff = f.asInstanceOf[F]
Expand Down
3 changes: 2 additions & 1 deletion core/src/main/scala/core/BaseData.scala
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,8 @@ trait BaseParams extends Serializable {
*/

// Below are internal classes used by PIO workflow
trait BasePersistentData extends AnyRef {}
//trait BasePersistentData extends AnyRef {}
trait BasePersistentData extends Serializable {}

trait BaseValidationSeq extends BasePersistentData {}

Expand Down
4 changes: 2 additions & 2 deletions engines/src/main/scala/itemrank/Data.scala
Original file line number Diff line number Diff line change
Expand Up @@ -75,13 +75,13 @@ class ItemTD(
val itypes: Seq[String],
val starttime: Option[Long],
val endtime: Option[Long],
val inactive: Boolean)
val inactive: Boolean) extends Serializable

class RatingTD(
val uindex: Int,
val iindex: Int,
val rating: Int,
val t: Long)
val t: Long) extends Serializable

class TrainingData(
val users: Map[Int, String], // uindex->uid
Expand Down
4 changes: 3 additions & 1 deletion engines/src/main/scala/itemrank/Engine.scala
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,9 @@ import io.prediction.EngineFactory
import io.prediction.{ DefaultCleanser, DefaultServer }

object ItemRankEngine extends EngineFactory {
override def apply(): AbstractEngine = {
//override def apply(): AbstractEngine = {
override def apply()
: BaseEngine[TrainingData,TrainingData,Feature,Prediction] = {
new BaseEngine(
classOf[DefaultCleanser[TrainingData]],
Map("knn" -> classOf[KNNAlgorithm],
Expand Down
42 changes: 32 additions & 10 deletions engines/src/main/scala/itemrank/ItemRankEvaluator.scala
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,24 @@ import scala.math.BigDecimal
object ItemRankEvaluator extends EvaluatorFactory {

val config = new Config
val usersDb = config.getAppdataUsers
val itemsDb = config.getAppdataItems
val u2iDb = config.getAppdataU2IActions
val itemSetsDb = config.getAppdataItemSets

override def apply(): AbstractEvaluator = {
val usersDb = config.getAppdataUsers()
val itemsDb = config.getAppdataItems()
val u2iDb = config.getAppdataU2IActions()
val itemSetsDb = config.getAppdataItemSets()

//override def apply(): AbstractEvaluator = {
override def apply()
: BaseEvaluator[EvalParams,
EvalParams,
TrainDataPrepParams,
ValidationDataPrepParams,
TrainingData,
Feature,
Prediction,
Actual,
ValidationUnit,
EmptyData,
EmptyData] = {
new BaseEvaluator(
classOf[ItemRankDataPreparator],
classOf[ItemRankValidator])
Expand All @@ -42,10 +54,11 @@ class ItemRankDataPreparator
final val CONFLICT_HIGHEST: String = "highest"
final val CONFLICT_LOWEST: String = "lowest"

val usersDb = ItemRankEvaluator.usersDb
val itemsDb = ItemRankEvaluator.itemsDb
val u2iDb = ItemRankEvaluator.u2iDb
val itemSetsDb = ItemRankEvaluator.itemSetsDb
// Connection object makes the class not serializable.
//@transient val usersDb = ItemRankEvaluator.usersDb
//@transient val itemsDb = ItemRankEvaluator.itemsDb
//@transient val u2iDb = ItemRankEvaluator.u2iDb
//@transient val itemSetsDb = ItemRankEvaluator.itemSetsDb

// Data generation
override def getParamsSet(params: EvalParams)
Expand Down Expand Up @@ -83,6 +96,11 @@ class ItemRankDataPreparator


override def prepareTraining(params: TrainDataPrepParams): TrainingData = {
val usersDb = ItemRankEvaluator.usersDb
val itemsDb = ItemRankEvaluator.itemsDb
val u2iDb = ItemRankEvaluator.u2iDb
val itemSetsDb = ItemRankEvaluator.itemSetsDb

val usersMap: Map[String, Int] = usersDb.getByAppid(params.appid)
.map(_.id).zipWithIndex
.map { case (uid, index) => (uid, index + 1) }.toMap
Expand Down Expand Up @@ -178,6 +196,10 @@ class ItemRankDataPreparator
// TODO: use t to generate eval data
override def prepareValidation(params: ValidationDataPrepParams):
Seq[(Feature, Actual)] = {
val usersDb = ItemRankEvaluator.usersDb
val itemsDb = ItemRankEvaluator.itemsDb
val u2iDb = ItemRankEvaluator.u2iDb
val itemSetsDb = ItemRankEvaluator.itemSetsDb

val usersMap: Map[String, Int] = usersDb.getByAppid(params.appid)
.map(_.id).zipWithIndex
Expand Down
15 changes: 12 additions & 3 deletions engines/src/main/scala/stock/Data.scala
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import org.saddle._
import org.saddle.index.IndexTime
import com.github.nscala_time.time.Imports._
import breeze.linalg.{ DenseMatrix, DenseVector }
import com.twitter.chill.MeatLocker

// Use data after baseData.
// Afterwards, slicing uses idx
Expand Down Expand Up @@ -57,6 +58,11 @@ class TrainingData(
val lastDate = price.rowIx.last.get
s"TrainingData $firstDate $lastDate"
}
/*
private def writeObject(oos: ObjectOutputStream): Unit = {
}
*/
}

class Model(
Expand All @@ -65,10 +71,13 @@ class Model(
class Feature(
// This is different from TrainingData. This serves as input for algorithm.
// Hence, the time series should be shorter than that of TrainingData.
val data: Frame[DateTime, String, Double]) extends BaseFeature {
//val data: Frame[DateTime, String, Double]) extends BaseFeature {
val boxedData: MeatLocker[Frame[DateTime, String, Double]]) extends BaseFeature {
override def toString(): String = {
val firstDate = data.rowIx.first.get
val lastDate = data.rowIx.last.get
//val firstDate = data.rowIx.first.get
//val lastDate = data.rowIx.last.get
val firstDate = boxedData.get.rowIx.first.get
val lastDate = boxedData.get.rowIx.last.get
s"Feature $firstDate $lastDate"
}
}
Expand Down
6 changes: 5 additions & 1 deletion engines/src/main/scala/stock/Evaluator.scala
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import breeze.linalg.{ DenseMatrix, DenseVector }
import breeze.stats.{ mean, meanAndVariance }
import nak.regress.LinearRegression
import scala.collection.mutable.ArrayBuffer
import com.twitter.chill.MeatLocker

object StockEvaluator extends EvaluatorFactory {
// Use singleton class here to avoid re-registering hooks in config.
Expand Down Expand Up @@ -218,7 +219,10 @@ class StockDataPreparator

val featureData = Frame(data.map(e => (e._1, e._2)): _*)
val targetData = data.map(e => (e._1, e._3)).toMap
return (new Feature(data = featureData), new Target(data = targetData))
//return (new Feature(data = featureData), new Target(data = targetData))
return (new Feature(
boxedData = MeatLocker(featureData)),
new Target(data = targetData))
}

def prepareValidation(params: ValidationDataParams)
Expand Down
3 changes: 2 additions & 1 deletion engines/src/main/scala/stock/RandomAlgorithm.scala
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,8 @@ class RandomAlgorithm
}

def predict(model: EmptyModel, feature: Feature): Target = {
val tickers = feature.data.colIx.toVec.contents
//val tickers = feature.data.colIx.toVec.contents
val tickers = feature.boxedData.get.colIx.toVec.contents
val prediction = tickers.map {
ticker => (ticker, _drift + _random.nextGaussian() * _scale)
}.toMap
Expand Down
3 changes: 2 additions & 1 deletion engines/src/main/scala/stock/RegressionAlgorithm.scala
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,8 @@ class RegressionAlgorithm
}

def predict(model: Model, feature: Feature): Target = {
val price: Frame[DateTime, String, Double] = feature.data
//val price: Frame[DateTime, String, Double] = feature.data
val price: Frame[DateTime, String, Double] = feature.boxedData.get
val modelData = model.data
val prediction = price.colIx.toVec.contents
// If model doesn't have the data, skip
Expand Down
47 changes: 31 additions & 16 deletions spark/build.sbt
Original file line number Diff line number Diff line change
@@ -1,29 +1,44 @@
name := "Imagine Spark"
import AssemblyKeys._

version := "0.8"
assemblySettings

scalaVersion := "2.10.4"

libraryDependencies += "org.apache.spark" %% "spark-core" % "1.0.0"
name := "pio-spark"

libraryDependencies ++= Seq(
"ch.qos.logback" % "logback-classic" % "1.1.2",
"com.twitter" %% "chill" % "0.3.6",
"org.clapper" %% "grizzled-slf4j" % "1.0.2",
"org.scala-saddle" %% "saddle-core" % "1.3.2",
"org.scalanlp" %% "breeze" % "0.7",
"org.scalanlp" %% "breeze-natives" % "0.7",
"org.scalanlp" % "nak" % "1.2.1",
"org.json4s" %% "json4s-native" % "3.2.9",
"org.apache.spark" %% "spark-core" % "1.0.0" % "provided",
"com.typesafe" % "config" % "1.2.1",
"com.typesafe.akka" %% "akka-contrib" % "2.3.2",
"com.typesafe.akka" %% "akka-testkit" % "2.3.2",
"commons-io" % "commons-io" % "2.4",
"org.clapper" %% "grizzled-slf4j" % "1.0.2",
"org.mongodb" %% "casbah" % "2.7.2",
"org.scalatest" %% "scalatest" % "2.1.6" % "test",
"org.json4s" %% "json4s-native" % "3.2.9",
"org.json4s" %% "json4s-ext" % "3.2.7")

libraryDependencies += "com.github.nscala-time" %% "nscala-time" % "1.2.0"
"com.github.scopt" %% "scopt" % "3.2.0")

// "org.apache.spark" %% "spark-core" % "1.0.0" % "provided",

resolvers += "Akka Repository" at "http:https://repo.akka.io/releases/"

resolvers += Resolver.url(
"Typesafe Releases",
url("http:https://repo.typesafe.com/typesafe/ivy-releases"))(
Resolver.ivyStylePatterns)
url("http:https://repo.typesafe.com/typesafe/ivy-releases"))(
Resolver.ivyStylePatterns)


addCompilerPlugin("org.scala-sbt.sxr" %% "sxr" % "0.3.0")

scalacOptions <<= (scalacOptions, scalaSource in Compile) map { (options, base) =>
options :+ ("-P:sxr:base-directory:" + base.getAbsolutePath)
}

run in Compile <<= Defaults.runTask(fullClasspath in Compile, mainClass in (Compile, run), runner in (Compile, run))

mergeStrategy in assembly <<= (mergeStrategy in assembly) { (old) =>
{
case PathList("scala", xs @ _*) => MergeStrategy.discard
case x => old(x)
}
}
Loading

0 comments on commit da24d7b

Please sign in to comment.