Skip to content

Commit

Permalink
stock data view.
Browse files Browse the repository at this point in the history
  • Loading branch information
Justin Yip committed Sep 12, 2014
1 parent 0bfd9e5 commit 9d93d69
Show file tree
Hide file tree
Showing 2 changed files with 146 additions and 67 deletions.
185 changes: 120 additions & 65 deletions examples/src/main/scala/stock2/YahooDataSource.scala
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import org.joda.time.DateTimeZone
import com.github.nscala_time.time.Imports._

import scala.collection.mutable.{ Map => MMap }
import scala.collection.GenMap

import io.prediction.controller._
import io.prediction.controller.{ Params => BaseParams }
Expand All @@ -32,10 +33,10 @@ case class HistoricalData(
val close: Array[Double],
val adjClose: Array[Double],
val adjReturn: Array[Double],
val volume: Array[Double]) {
val volume: Array[Double],
val active: Array[Boolean]) extends Serializable {

override
def toString(): String = {
override def toString(): String = {
s"HistoricalData($ticker, ${timeIndex.head}, ${timeIndex.last})"
}
}
Expand All @@ -55,49 +56,74 @@ class YahooDataSource(val params: YahooDataSource.Params)
val timezone = DateTimeZone.forID("US/Eastern")
val marketTicker = params.windowParams.marketTicker

def mergeTimeIndex(intermediate: YahooDataSource.Intermediate, e: Event)
val market: HistoricalData = getTimeIndex()
//val timeIndex: Array[DateTime] = Array[DateTime](market.timeIndex:_*)
val timeIndex: Array[DateTime] = market.timeIndex
val timeIndexSet: Set[DateTime] = timeIndex.toSet

def merge(intermediate: YahooDataSource.Intermediate, e: Event,
timeIndexSetOpt: Option[Set[DateTime]])
: YahooDataSource.Intermediate = {
println("Merge")
val dm: DataMap = e.properties

val dailyMap = intermediate.dailyMap
// TODO: Check ticker in intermediate

println(dm)


val yahooData = dm.get[JObject]("yahoo")

// used by json4s "extract" method.
implicit val formats = DefaultFormats

val closeList = (yahooData \ "close").extract[Array[Double]]
val adjCloseList = (yahooData \ "adjclose").extract[Array[Double]]
val volumeList = (yahooData \ "volume").extract[Array[Double]]


val tList: Array[DateTime] = (yahooData \ "t").extract[Array[Long]]
.map(t => new DateTime(t * 1000, timezone))

tList.zipWithIndex.drop(1).foreach { case (t, idx) => {
val adjReturn = (adjCloseList(idx) / adjCloseList(idx - 1)) - 1
// Add data either
// 1. timeIndex exists and t is in timeIndex, or
// 2. timeIndex is None.
val newDailyMap: Map[DateTime, YahooDataSource.Daily] =
tList.zipWithIndex.drop(1)
.filter { case (t, idx) => timeIndexSetOpt.map(_(t)).getOrElse(true) }
.map { case (t, idx) =>
val adjReturn = (adjCloseList(idx) / adjCloseList(idx - 1)) - 1

val daily = YahooDataSource.Daily(
close = closeList(idx),
adjClose = adjCloseList(idx),
adjReturn = adjReturn,
volume = volumeList(idx),
active = true,
prevDate = tList(idx - 1))

(t -> daily)
}
.toMap

YahooDataSource.Intermediate(
ticker = e.entityId,
dailyMap = intermediate.dailyMap ++ newDailyMap)
}

val daily = YahooDataSource.Daily(
close = closeList(idx),
adjClose = adjCloseList(idx),
adjReturn = adjReturn,
volume = volumeList(idx),
prevDate = tList(idx - 1))
def mergeTimeIndex(intermediate: YahooDataSource.Intermediate, e: Event)
: YahooDataSource.Intermediate = merge(intermediate, e, None)

dailyMap += (t -> daily)
}}

YahooDataSource.Intermediate(ticker = e.entityId, dailyMap = dailyMap)
}
def mergeStock(intermediate: YahooDataSource.Intermediate, e: Event)
: YahooDataSource.Intermediate = merge(intermediate, e, Some(timeIndexSet))

def finalizeTimeIndex(intermediate: YahooDataSource.Intermediate): HistoricalData = {
def finalizeTimeIndex(intermediate: YahooDataSource.Intermediate)
: HistoricalData = {
val dailyMap = intermediate.dailyMap
val ticker = intermediate.ticker

val timeIndex: Array[DateTime] = dailyMap.keys.toArray.sortBy(identity)
// Construct the time index with windowParams
val timeIndex: Array[DateTime] = dailyMap.keys.toArray
.filter(_.isAfter(params.windowParams.baseDate))
.sortBy(identity)
.take(params.windowParams.untilIdx)

// Check if the time is continuous
(1 until timeIndex.size).foreach { idx => {
require(dailyMap(timeIndex(idx)).prevDate == timeIndex(idx - 1),
s"Time must be continuous. " +
Expand All @@ -112,27 +138,50 @@ class YahooDataSource(val params: YahooDataSource.Params)
close = timeIndex.map(t => dailyMap(t).close),
adjClose = timeIndex.map(t => dailyMap(t).adjClose),
adjReturn = timeIndex.map(t => dailyMap(t).adjReturn),
volume = timeIndex.map(t => dailyMap(t).volume))
volume = timeIndex.map(t => dailyMap(t).volume),
active = Array.fill(timeIndex.size)(true))
}

/*
def read() {
val predicate = (e: Event) => (e.entityType == "yahoo")
val map: (Event => DataMap) = (e: Event) => (e.properties)
// Traverse the timeIndex to construct the actual time series using dailyMap
// and extra fillNA logic.
//
// The time series is constructed in the same order as the global timeIndex
// array. For a datetime t, if dailyMap contains the data, it calls valueFunc
// to extract the value; otherwise, it calls fillNaFunc with the optional last
// extracted value and get the default value.
def activeFilter[A : Manifest](
dailyMap: GenMap[DateTime, YahooDataSource.Daily],
valueFunc: YahooDataSource.Daily => A,
fillNAFunc: Option[A] => A) : Array[A] = {

var lastOpt: Option[A] = None

timeIndex
.map { t =>
if (dailyMap.contains(t)) {
val v = valueFunc(dailyMap(t))
lastOpt = Some(v)
v
} else {
fillNAFunc(lastOpt)
}
}
.toArray
}

val tickerMap: Map[String, HistoricalData] = batchView
.aggregateByEntityOrdered(
predicate,
YahooDataSource.Intermediate(),
merge)
.mapValues(finalize)
def finalizeStock(intermediate: YahooDataSource.Intermediate)
: HistoricalData = {
val dailyMap = intermediate.dailyMap

tickerMap.foreach { case (ticker, data) => {
println(ticker)
println(data)
}}
HistoricalData(
ticker = intermediate.ticker,
timeIndex = timeIndex,
close = activeFilter[Double](dailyMap, _.close, _.getOrElse(0.0)),
adjClose = activeFilter[Double](dailyMap, _.adjClose, _.getOrElse(0.0)),
adjReturn = activeFilter[Double](dailyMap, _.adjReturn, _.getOrElse(0.0)),
volume = activeFilter[Double](dailyMap, _.adjReturn, _ => 0.0),
active = activeFilter[Boolean](dailyMap, _.active, _ => false))
}
*/

def getTimeIndex(): HistoricalData = {
// Only extracts market ticker as the main reference of market hours
Expand All @@ -145,35 +194,31 @@ class YahooDataSource(val params: YahooDataSource.Params)
YahooDataSource.Intermediate(),
mergeTimeIndex)
.mapValues(finalizeTimeIndex)

tickerMap(marketTicker)
}

def getHistoricalDataSet()
: Map[String, HistoricalData] = {
val tickerSet = params.windowParams.tickerList.toSet
val predicate = (e: Event) =>
(e.entityType == params.entityType && tickerSet(e.entityId))

val market = tickerMap(marketTicker)
println(market)
market
tickerMap: Map[String, HistoricalData] = batchView
.aggregateByEntityOrdered(
predicate,
YahooDataSource.Intermediate(),
mergeStock)
.mapValues(finalizeStock)
}

def read(sc: SparkContext)
: Seq[(AnyRef, RDD[AnyRef], RDD[(AnyRef, AnyRef)])] = {

getTimeIndex
val data = getHistoricalDataSet()

/*
data.keys.foreach { println }

val predicate = (e: Event) => (e.entityType == "yahoo")
val map: (Event => DataMap) = (e: Event) => (e.properties)
val tickerMap: Map[String, HistoricalData] = batchView
.aggregateByEntityOrdered(
predicate,
YahooDataSource.Intermediate(),
merge)
.mapValues(finalize)
tickerMap.foreach { case (ticker, data) => {
println(ticker)
println(data)
}}
*/

Seq[(AnyRef, RDD[AnyRef], RDD[(AnyRef, AnyRef)])]()
}
Expand All @@ -194,12 +239,22 @@ object YahooDataSource {
val adjClose: Double,
val adjReturn: Double,
val volume: Double,
val active: Boolean,
// prevDate is used to verify continuity
val prevDate: DateTime)

/** Intermediate storage for constructing historical data
* @param timeIndexSet Only datetime in this set is used to create historical
* data.
*/
case class Intermediate(
val ticker: String = "",
val dailyMap: MMap[DateTime, Daily] = MMap[DateTime, Daily]())
//val dailyMap: MMap[DateTime, Daily] = MMap[DateTime, Daily]()
val dailyMap: Map[DateTime, Daily] = Map[DateTime, Daily]()
) extends Serializable {
override def toString(): String =
s"YDS.Intermediate($ticker, size=${dailyMap.size})"
}


/*
Expand All @@ -219,13 +274,13 @@ object YahooDataSourceRun {
def main(args: Array[String]) {
val dsp = YahooDataSource.Params(
windowParams = DataSourceParams(
baseDate = new DateTime(2004, 1, 1, 0, 0),
baseDate = new DateTime(2014, 1, 1, 0, 0),
fromIdx = 20,
untilIdx = 50,
trainingWindowSize = 15,
maxTestingWindowSize = 10,
marketTicker = "SPY",
tickerList = Seq("AAPL", "MSFT", "IBM")),
tickerList = Seq("AAPL", "MSFT", "IBM", "FB", "AMZN")),
appId = 1,
entityType = "yahoo",
untilTime = Some(new DateTime(2014, 5, 1, 0, 0)))
Expand Down
28 changes: 26 additions & 2 deletions sdk/python-sdk/examples/fetch_yahoo.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,6 @@ def import_predefined():
(datetime(2014, 6, 1), datetime(2014, 7, 1), datetime(2014, 7, 15)),
]

#ticker = 'SPY'
tickers = ['SPY', 'AAPL', 'IBM', 'MSFT']

appid = 1
Expand All @@ -81,7 +80,32 @@ def import_predefined():
for time_slice in time_slices:
import_data(client, appid, ticker,
time_slice[0], time_slice[1], time_slice[2])
#return

# below are data with holes
time_slices = [
(datetime(2014, 1, 1), datetime(2014, 1, 20), datetime(2014, 2, 10)),
(datetime(2014, 2, 10), datetime(2014, 3, 31), datetime(2014, 4, 2)),
(datetime(2014, 6, 1), datetime(2014, 7, 1), datetime(2014, 7, 15)),
]

tickers = ['AMZN']
for ticker in tickers:
for time_slice in time_slices:
import_data(client, appid, ticker,
time_slice[0], time_slice[1], time_slice[2])

time_slices = [
(datetime(2014, 1, 10), datetime(2014, 2, 20), datetime(2014, 2, 28)),
(datetime(2014, 2, 10), datetime(2014, 3, 31), datetime(2014, 4, 2)),
]
tickers = ['FB']
for ticker in tickers:
for time_slice in time_slices:
import_data(client, appid, ticker,
time_slice[0], time_slice[1], time_slice[2])





def import_one():
Expand Down

0 comments on commit 9d93d69

Please sign in to comment.