Skip to content

Commit

Permalink
First version of using Events in ItemRank engine.
Browse files Browse the repository at this point in the history
  • Loading branch information
k4hoo committed Sep 11, 2014
1 parent 089f7bc commit 8a0ec3f
Show file tree
Hide file tree
Showing 8 changed files with 182 additions and 69 deletions.
11 changes: 11 additions & 0 deletions engines/src/main/scala/itemrank/Data.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,17 @@ package io.prediction.engines.itemrank

import com.github.nscala_time.time.Imports._

/* this engine require following attributes */
case class AttributeNames(
val user: String, // name of user entityType
val item: String, // name of item entityType
val u2iActions: Set[String], // event name of the u2i actions
val itypes: String, // name of ityps attributes
val starttime: String, // name of starttime attribute, DateTime
val endtime: String, // name of endtime attribute, DateTime
val inactive: String, // name of item inactive property. boolean
val rating: String // name of rating property. integer rating
)

class ItemTD(
val iid: String,
Expand Down
91 changes: 91 additions & 0 deletions engines/src/main/scala/itemrank/DataSource.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
package io.prediction.engines.itemrank

import io.prediction.controller.LDataSource
import io.prediction.controller.Params
import io.prediction.data.view.LBatchView

import org.joda.time.DateTime

case class EventsDataSoureParams(
val appId: Int,
// default None to include all itypes
val itypes: Option[Set[String]] = None, // train items with these itypes
// actions for training
val actions: Set[String],
val startTime: Option[DateTime], // event starttime
val untilTime: Option[DateTime], // event untiltime
val attributeNames: AttributeNames
) extends Params


class EventsDataSource(dsp: EventsDataSoureParams)
extends LDataSource[DataSourceParams,
DataParams, TrainingData, Query, Actual] {

@transient lazy val batchView = new LBatchView(dsp.appId,
dsp.startTime, dsp.untilTime)

override
def readTraining(): TrainingData = {

val attributeNames = dsp.attributeNames
// uid => (UserTD, uindex)
val usersMap: Map[String, (UserTD, Int)] = batchView
.aggregateProperties(attributeNames.user)
.zipWithIndex
.map { case ((entityId, dataMap), index) =>
val userTD = new UserTD(uid = entityId)
(entityId -> (userTD, index + 1)) // make index starting from 1
}

val itemsMap = batchView
.aggregateProperties(attributeNames.item)
.map { case (entityId, dataMap) =>
val itemTD = new ItemTD(
iid = entityId,
itypes = dataMap.get[List[String]](attributeNames.itypes),
starttime = dataMap.getOpt[DateTime](attributeNames.starttime)
.map(_.getMillis),
endtime = dataMap.getOpt[DateTime](attributeNames.endtime)
.map(_.getMillis),
inactive = dataMap.getOpt[Boolean](attributeNames.inactive)
.getOrElse(false)
)
(entityId -> itemTD)
}.filter { case (id, (itemTD)) =>
dsp.itypes.map{ t =>
!(itemTD.itypes.toSet.intersect(t).isEmpty)
}.getOrElse(true)
}.zipWithIndex.map { case ((id, itemTD), index) =>
(id -> (itemTD, index + 1))
}

val u2iActions = batchView.events
.filter{ e =>
attributeNames.u2iActions.contains(e.event) &&
dsp.actions.contains(e.event) &&
usersMap.contains(e.entityId) &&
// if the event doesn't have targetEntityId, also include it
// although it's error case.
// check and flag error in next step
e.targetEntityId.map(itemsMap.contains(_)).getOrElse(true)
}.map { e =>
// make sure targetEntityId exist in this event
require((e.targetEntityId != None),
s"u2i Event: ${e} cannot have targetEntityId empty.")
new U2IActionTD(
uindex = usersMap(e.entityId)._2,
iindex = itemsMap(e.targetEntityId.get)._2,
action = e.event,
v = e.properties.getOpt[Int](attributeNames.rating),
t = e.eventTime.getMillis
)
}

new TrainingData(
users = usersMap.map { case (k, (v1, v2)) => (v2, v1) },
items = itemsMap.map { case (k, (v1, v2)) => (v2, v1) },
u2iActions = u2iActions
)
}
}
3 changes: 2 additions & 1 deletion engines/src/main/scala/itemrank/Engine.scala
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,8 @@ import io.prediction.controller.Engine
object ItemRankEngine extends IEngineFactory {
def apply() = {
new Engine(
classOf[ItemRankDataSource],
//classOf[ItemRankDataSource],
classOf[EventsDataSource],
classOf[ItemRankPreparator],
Map("knn" -> classOf[KNNAlgorithm],
"rand" -> classOf[RandomAlgorithm],
Expand Down
98 changes: 38 additions & 60 deletions engines/src/main/scala/itemrank/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ ItemRank engine

## Use case

In each day (or other time unit), there is a list of items to be shown to users and we want to personalize the order of the list of items for each user.
Personalize the order of a list of items for each user.

## Prediction Input

Expand All @@ -14,25 +14,42 @@ In each day (or other time unit), there is a list of items to be shown to users

- ranked items with score

## Events requirement

## App Data
- user entity
- item entity, with following properties:
- itypes: array of String
- starttime: ISO 8601
- endtime: ISO 8601
- inactive: boolean
- user-to-item action event, optionally with following properties:
- rating: integer rating

* Users
* Items
* U2IActions
* ItemSets - the list of items at each day (or other time unit)
## Example Run

Import Sample Data
==================
Run under examples.
Import sample events
```
$ python sdk/python-sdk/itemrec_example.py --appid <appid>
```
examples$ set -a
examples$ source ../conf/pio-env.sh
examples$ set +a

examples$ ../sbt/sbt "runMain io.prediction.engines.itemrank.CreateSampleData"
```
$ cd engines/src/main/scala/itemrank/examples
```

Update the appId in params/datasource.json with <appid>

```
$ ../../../../../../bin/pio register
$ ../../../../../../bin/pio train -ap mahoutalgo.json
$ ../../../../../../bin/pio deploy
```

Retrieve prediction:

```
$ curl -i -X POST http:https://localhost:8000/ \
-d '{ "uid" : "u2", "items" : ["i0", "i1", "i2", "i3"] }'
```

Run Evaluation
==============
Expand All @@ -42,52 +59,13 @@ Run Evaluation
../bin/pio-run io.prediction.engines.itemrank.Runner
```

Import Sample data (Obsolete)
Import Sample Data (obsolete)
==================
Run under examples.
```
examples$ set -a
examples$ source ../conf/pio-env.sh
examples$ set +a
Start Mongo:

$ mongod

At project root directory (**Imagine/**):

$ sbt/sbt "engines/runMain io.prediction.engines.itemrank.CreateSampleData"


By default, it imports data into mongo with appid=1 and create 30 days of sample data. You may specify different appid or different number of days by using --appid and --days parameters.

For example, to import to appid=4 with 90 days of data:

$ sbt/sbt "engines/runMain io.prediction.engines.itemrank.CreateSampleData --appid 4 --days 90"


Runner with spark-submit (Obsolete)
=========================

At project root directory (**Image/**):

$ sbt/sbt package
$ sbt/sbt engines/assemblyPackageDependency
$ $SPARK_HOME/bin/spark-submit --jars engines/target/scala-2.10/engines-assembly-0.8.0-SNAPSHOT-deps.jar,/Users/ckh/dev/mac_dev/pio/Imagine/engines/target/scala-2.10/engines_2.10-0.8.0-SNAPSHOT.jar --deploy-mode "client" --class "io.prediction.engines.itemrank.Runner" core/target/scala-2.10/core_2.10-0.8.0-SNAPSHOT.jar


Run Evaluation (Obsolete)
==============

At project root directory (**Image/**):

$ sbt/sbt package
$ sbt/sbt engines/assemblyPackageDependency
$ sbt/sbt "core/runMain io.prediction.tools.RegisterEngine ../engines/src/main/scala/itemrank/examples/manifest.json"

KNNAlgorithm:

$ sbt/sbt "core/runMain io.prediction.tools.RunEvaluationWorkflow --sparkHome $SPARK_HOME io.prediction.engines.itemrank 0.8.0-SNAPSHOT --jsonDir ../engines/src/main/scala/itemrank/examples --ap kNNAlgoParams.json"

MahoutItemBasedAlgorithm:

$ sbt/sbt "core/runMain io.prediction.tools.RunEvaluationWorkflow --sparkHome $SPARK_HOME io.prediction.engines.itemrank 0.8.0-SNAPSHOT --jsonDir ../engines/src/main/scala/itemrank/examples --ap mahoutAlgoParams.json"

RandomAlgorithm:

$ sbt/sbt "core/runMain io.prediction.tools.RunEvaluationWorkflow --sparkHome $SPARK_HOME io.prediction.engines.itemrank 0.8.0-SNAPSHOT --jsonDir ../engines/src/main/scala/itemrank/examples --ap randomAlgoParams.json"
examples$ ../sbt/sbt "runMain io.prediction.engines.itemrank.CreateSampleData"
```
10 changes: 10 additions & 0 deletions engines/src/main/scala/itemrank/examples/params/OLDdatasource.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
{
"appid": 1,
"actions": [ "view", "like", "conversion", "rate" ],
"hours": 24,
"trainStart" : "2014-04-01T00:00:00.000Z",
"testStart" : "2014-04-20T00:00:00.000Z",
"testUntil" : "2014-04-21T00:00:00.000Z",
"goal": ["conversion", "view"],
"verbose" : true
}
18 changes: 11 additions & 7 deletions engines/src/main/scala/itemrank/examples/params/datasource.json
Original file line number Diff line number Diff line change
@@ -1,10 +1,14 @@
{
"appid": 1,
"appId": 3,
"actions": [ "view", "like", "conversion", "rate" ],
"hours": 24,
"trainStart" : "2014-04-01T00:00:00.000Z",
"testStart" : "2014-04-20T00:00:00.000Z",
"testUntil" : "2014-04-21T00:00:00.000Z",
"goal": ["conversion", "view"],
"verbose" : true
"attributeNames" : {
"user" : "user",
"item" : "item",
"u2iActions" : [ "view", "like", "conversion", "rate" ],
"itypes" : "pio_itypes",
"starttime" : "starttime",
"endtime" : "endtime",
"inactive" : "inactive",
"rating" : "pio_rate"
}
}
2 changes: 1 addition & 1 deletion examples/scala-test/HelloWorld.scala
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import scala.collection.immutable.HashMap
import io.prediction.data.view.LBatchView

// all data need to be serializable
class MyTrainingData(
case class MyTrainingData(
// list of (day, temperature) tuples
val data: List[String],
val temperatures: List[(String, Double)]
Expand Down
18 changes: 18 additions & 0 deletions examples/scala-test/Runner.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
package org.sample.test

import io.prediction.controller._

object Runner {

def main(args: Array[String]) {

Workflow.run(
dataSourceClassOpt = Some(classOf[MyDataSource]),
params = WorkflowParams(
verbose = 3,
batch = "MyDataSource")
)

}

}

0 comments on commit 8a0ec3f

Please sign in to comment.