Skip to content

Commit

Permalink
Stock2 Yahoo DS.
Browse files Browse the repository at this point in the history
  • Loading branch information
Justin Yip committed Sep 11, 2014
1 parent 883a845 commit 6d1605a
Showing 1 changed file with 70 additions and 4 deletions.
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
package io.prediction.data.examples
//package io.prediction.data.examples
package io.prediction.examples.stock2

// YahooDataSource reads PredictionIO event store directly.

import io.prediction.data.storage.Event
import io.prediction.data.storage.Events
Expand All @@ -12,6 +15,15 @@ import com.github.nscala_time.time.Imports._

import scala.collection.mutable.{ Map => MMap }

import io.prediction.controller._
import io.prediction.controller.{ Params => BaseParams }


import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.rdd.RDD
import org.apache.spark.broadcast.Broadcast

case class HistoricalData(
val ticker: String,
val timeIndex: Array[DateTime],
Expand All @@ -26,14 +38,21 @@ case class HistoricalData(
}
}

class YahooDataSource(val params: YahooDataSource.Params) {
class YahooDataSource(val params: YahooDataSource.Params)
extends PDataSource[
YahooDataSource.Params,
AnyRef,
RDD[AnyRef],
AnyRef,
AnyRef] {
@transient lazy val batchView = new LBatchView(
params.appId, params.startTime, params.untilTime)

val timezone = DateTimeZone.forID("US/Eastern")

def merge(intermediate: YahooDataSource.Intermediate, e: Event)
: YahooDataSource.Intermediate = {
println("Merge")
val dm: DataMap = e.properties

val dailyMap = intermediate.dailyMap
Expand Down Expand Up @@ -85,6 +104,7 @@ class YahooDataSource(val params: YahooDataSource.Params) {
}


/*
def read() {
val predicate = (e: Event) => (e.entityType == "yahoo")
val map: (Event => DataMap) = (e: Event) => (e.properties)
Expand All @@ -101,14 +121,39 @@ class YahooDataSource(val params: YahooDataSource.Params) {
println(data)
}}
}
*/

def read(sc: SparkContext)
: Seq[(AnyRef, RDD[AnyRef], RDD[(AnyRef, AnyRef)])] = {
println("HEREHERHEHREHERREH")

val predicate = (e: Event) => (e.entityType == "yahoo")
val map: (Event => DataMap) = (e: Event) => (e.properties)

println("Right before reading")

val tickerMap: Map[String, HistoricalData] = batchView
.aggregateByEntityOrdered(
predicate,
YahooDataSource.Intermediate(),
merge)
.mapValues(finalize)

tickerMap.foreach { case (ticker, data) => {
println(ticker)
println(data)
}}

Seq[(AnyRef, RDD[AnyRef], RDD[(AnyRef, AnyRef)])]()
}
}

object YahooDataSource {
case class Params(
val appId: Int,
val startTime: Option[DateTime] = None,
val untilTime: Option[DateTime] = None
)
) extends BaseParams

case class Daily(
val close: Double,
Expand All @@ -129,6 +174,27 @@ object YahooDataSource {
//untilTime = None)

val ds = new YahooDataSource(params)
ds.read
//ds.read
}
}

object YahooDataSourceRun {
def main(args: Array[String]) {
val dsp = YahooDataSource.Params(
appId = 1,
untilTime = Some(new DateTime(2014, 5, 1, 0, 0)))
//untilTime = None)

Workflow.run(
dataSourceClassOpt = Some(classOf[YahooDataSource]),
dataSourceParams = dsp,
params = WorkflowParams(
verbose = 3,
batch = "YahooDataSource")
)
}
}




0 comments on commit 6d1605a

Please sign in to comment.