Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
…into develop
  • Loading branch information
Justin Yip committed Mar 12, 2015
2 parents 5b3164c + 5251c4d commit ea16070
Show file tree
Hide file tree
Showing 12 changed files with 316 additions and 33 deletions.
23 changes: 23 additions & 0 deletions bin/install.sh
Original file line number Diff line number Diff line change
Expand Up @@ -221,6 +221,14 @@ echo "JAVA_HOME is now set to: $JAVA_HOME"
# PredictionIO
echo -e "\033[1;36mStarting PredictionIO setup in:\033[0m $pio_dir"
cd $TEMP_DIR

# delete existing tmp file before download again
if [[ -e $PIO_FILE ]]; then
if confirm "Delete existing $PIO_FILE?"; then
rm $PIO_FILE
fi
fi

if [[ ! -e $PIO_FILE ]]; then
echo "Downloading PredictionIO..."
curl -O https://d8k1yxp8elc6b.cloudfront.net/$PIO_FILE
Expand All @@ -243,6 +251,11 @@ mkdir $vendors_dir

# Spark
echo -e "\033[1;36mStarting Spark setup in:\033[0m $spark_dir"
if [[ -e spark-$SPARK_VERSION-bin-hadoop2.4.tgz ]]; then
if confirm "Delete existing spark-$SPARK_VERSION-bin-hadoop2.4.tgz?"; then
rm spark-$SPARK_VERSION-bin-hadoop2.4.tgz
fi
fi
if [[ ! -e spark-$SPARK_VERSION-bin-hadoop2.4.tgz ]]; then
echo "Downloading Spark..."
curl -O http:https://d3kbcqa49mib13.cloudfront.net/spark-$SPARK_VERSION-bin-hadoop2.4.tgz
Expand All @@ -258,6 +271,11 @@ echo -e "\033[1;32mSpark setup done!\033[0m"

# Elasticsearch
echo -e "\033[1;36mStarting Elasticsearch setup in:\033[0m $elasticsearch_dir"
if [[ -e elasticsearch-$ELASTICSEARCH_VERSION.tar.gz ]]; then
if confirm "Delete existing elasticsearch-$ELASTICSEARCH_VERSION.tar.gz?"; then
rm elasticsearch-$ELASTICSEARCH_VERSION.tar.gz
fi
fi
if [[ ! -e elasticsearch-$ELASTICSEARCH_VERSION.tar.gz ]]; then
echo "Downloading Elasticsearch..."
curl -O https://download.elasticsearch.org/elasticsearch/elasticsearch/elasticsearch-$ELASTICSEARCH_VERSION.tar.gz
Expand All @@ -276,6 +294,11 @@ echo -e "\033[1;32mElasticsearch setup done!\033[0m"

# HBase
echo -e "\033[1;36mStarting HBase setup in:\033[0m $hbase_dir"
if [[ -e hbase-$HBASE_VERSION-hadoop2-bin.tar.gz ]]; then
if confirm "Delete existing hbase-$HBASE_VERSION-hadoop2-bin.tar.gz?"; then
rm hbase-$HBASE_VERSION-hadoop2-bin.tar.gz
fi
fi
if [[ ! -e hbase-$HBASE_VERSION-hadoop2-bin.tar.gz ]]; then
echo "Downloading HBase..."
curl -O http:https://archive.apache.org/dist/hbase/hbase-$HBASE_VERSION/hbase-$HBASE_VERSION-hadoop2-bin.tar.gz
Expand Down
32 changes: 19 additions & 13 deletions core/src/main/scala/controller/Evaluation.scala
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import io.prediction.core.BasePreparator
import io.prediction.core.BaseAlgorithm
import io.prediction.core.BaseServing
import io.prediction.core.BaseEvaluator
import io.prediction.core.BaseEvaluatorResult
import io.prediction.core.Doer
import io.prediction.core.BaseEngine
// import io.prediction.workflow.EngineWorkflow
Expand Down Expand Up @@ -56,24 +57,25 @@ import grizzled.slf4j.Logger
*/
trait Evaluation extends Deployment {
protected [this] var _evaluatorSet: Boolean = false
protected [this] var _evaluator: BaseEvaluator[_, _, _, _, _] = _
protected [this] var _evaluator: BaseEvaluator[_, _, _, _, _ <: BaseEvaluatorResult] = _

private [prediction]
def evaluator: BaseEvaluator[_, _, _, _, _] = {
def evaluator: BaseEvaluator[_, _, _, _, _ <: BaseEvaluatorResult] = {
assert(_evaluatorSet, "Evaluator not set")
_evaluator
}

private [prediction]
def engineEvaluator
: (BaseEngine[_, _, _, _], BaseEvaluator[_, _, _, _, _]) = {
assert(_evaluatorSet, "Evaluator not set")
(engine, _evaluator)
}

/** Sets both the [[Engine]] and [[Evaluator]] for this [[Evaluation]]. */
def engineEvaluator_=[EI, Q, P, A](
engineEvaluator: (BaseEngine[EI, Q, P, A], BaseEvaluator[EI, Q, P, A, _])) {
def engineEvaluator_=[EI, Q, P, A, R <: BaseEvaluatorResult](
engineEvaluator: (
BaseEngine[EI, Q, P, A],
BaseEvaluator[EI, Q, P, A, R])) {
assert(!_evaluatorSet, "Evaluator can be set at most once")
engine = engineEvaluator._1
_evaluator = engineEvaluator._2
Expand All @@ -83,17 +85,21 @@ trait Evaluation extends Deployment {
/** Returns both the [[Engine]] and [[Metric]] contained in this
* [[Evaluation]].
*/
private [prediction]
def engineMetric: (BaseEngine[_, _, _, _], Metric[_, _, _, _, _]) = {
throw new NotImplementedError("You should not call this")
(engine, null.asInstanceOf[Metric[_,_,_,_,_]])
throw new NotImplementedError("This method is to keep the compiler happy")
}

/** Sets both the [[Engine]] and [[Metric]] for this [[Evaluation]]. */
/** Sets both the [[Engine]], [[Metric]], and a list of auxilary [[Metric]]
* for this [[Evaluation]]. */
def engineMetric_=[EI, Q, P, A](
engineMetric: (BaseEngine[EI, Q, P, A], Metric[EI, Q, P, A, _])) {
val e: BaseEvaluator[EI, Q, P, A, _] = new MetricEvaluator(engineMetric._2)
engineEvaluator = (engineMetric._1, e)
engineMetric: (
BaseEngine[EI, Q, P, A],
Metric[EI, Q, P, A, _],
Seq[Metric[EI, Q, P, A, _]])) {
//val e: BaseEvaluator[EI, Q, P, A, R <: BaseEvaluatorResult] =
// new MetricEvaluator(engineMetric._2)
engineEvaluator = (
engineMetric._1,
new MetricEvaluator(engineMetric._2, engineMetric._3))
}

}
43 changes: 43 additions & 0 deletions core/src/main/scala/controller/Metric.scala
Original file line number Diff line number Diff line change
Expand Up @@ -73,3 +73,46 @@ extends Metric[EI, Q, P, A, Double] {
(r.map(_._1).sum / r.map(_._2).sum)
}
}

/** Returns the global average of the non-None score returned by the calculate
* method.
*
* @tparam EI Evaluation information
* @tparam Q Query
* @tparam P Predicted result
* @tparam A Actual result
*
* @group Evaluation
*/
abstract class OptionAverageMetric[EI, Q, P, A]
extends Metric[EI, Q, P, A, Double] {
/** Implement this method to return a score that will be used for averaging
* across all QPA tuples.
*/
def calculate(q: Q, p: P, a: A): Option[Double]

def calculate(sc: SparkContext, evalDataSet: Seq[(EI, RDD[(Q, P, A)])])
: Double = {
// TODO(yipjustin): Parallelize
val r: Seq[(Double, Long)] = evalDataSet
.par
.map { case (_, qpaRDD) =>
val scores: RDD[Double] = qpaRDD
.map { case (q, p, a) => calculate(q, p, a) }
.filter(!_.isEmpty)
.map(_.get)

// TODO: reduce and count are actions. Make them async.
(scores.reduce(_ + _), scores.count)
}
.seq

val c = r.map(_._2).sum
if (c > 0) {
(r.map(_._1).sum / r.map(_._2).sum)
} else {
Double.NegativeInfinity
}
}
}

50 changes: 42 additions & 8 deletions core/src/main/scala/controller/MetricEvaluator.scala
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import io.prediction.core.BaseAlgorithm
import io.prediction.core.BaseDataSource
import io.prediction.core.BaseEngine
import io.prediction.core.BaseEvaluator
import io.prediction.core.BaseEvaluatorResult
import io.prediction.core.BasePreparator
import io.prediction.core.BaseServing
import io.prediction.core.Doer
Expand All @@ -48,10 +49,34 @@ import scala.language.existentials
import scala.reflect.ClassTag
import scala.reflect.Manifest

case class MetricScores[R](
val score: R,
val otherScores: Seq[Any])


// Type is google data type
case class MetricEvaluatorResult[R](
val bestScore: R,
val bestScore: MetricScores[R],
val bestEngineParams: EngineParams,
val engineParamsScores: Seq[(EngineParams, R)]) {
val bestIdx: Int,
val metricHeader: String,
val otherMetricHeaders: Seq[String],
val engineParamsScores: Seq[(EngineParams, MetricScores[R])])
extends BaseEvaluatorResult {

override def toOneLiner(): String = {
val idx = engineParamsScores.map(_._1).indexOf(bestEngineParams)
s"Best Params Index: $idx Score: ${bestScore.score}"
}

override def toJSON(): String = {
implicit lazy val formats = Utils.json4sDefaultFormats +
new NameParamsSerializer

write(this)
}

override def toHTML(): String = html.metric_evaluator().toString

override def toString: String = {
implicit lazy val formats = Utils.json4sDefaultFormats +
Expand All @@ -68,7 +93,8 @@ case class MetricEvaluatorResult[R](
}

class MetricEvaluator[EI, Q, P, A, R](
val metric: Metric[EI, Q, P, A, R])
val metric: Metric[EI, Q, P, A, R],
val otherMetrics: Seq[Metric[EI, Q, P, A, _]] = Seq[Metric[EI, Q, P, A, _]]())
extends BaseEvaluator[EI, Q, P, A, MetricEvaluatorResult[R]] {
@transient lazy val logger = Logger[this.type]
@transient val engineInstances = Storage.getMetaDataEngineInstances
Expand All @@ -78,12 +104,14 @@ class MetricEvaluator[EI, Q, P, A, R](
engineEvalDataSet: Seq[(EngineParams, Seq[(EI, RDD[(Q, P, A)])])],
params: WorkflowParams): MetricEvaluatorResult[R] = {

val evalResultList: Seq[(EngineParams, R)] = engineEvalDataSet
val evalResultList: Seq[(EngineParams, MetricScores[R])] = engineEvalDataSet
.zipWithIndex
.par
.map { case ((engineParams, evalDataSet), idx) =>
val metricResult: R = metric.calculate(sc, evalDataSet)
(engineParams, metricResult)
val metricScores = MetricScores[R](
metric.calculate(sc, evalDataSet),
otherMetrics.map(_.calculate(sc, evalDataSet)))
(engineParams, metricScores)
}
.seq

Expand All @@ -97,12 +125,18 @@ class MetricEvaluator[EI, Q, P, A, R](
}}

// use max. take implicit from Metric.
val (bestEngineParams, bestScore) = evalResultList.reduce(
(x, y) => (if (metric.compare(x._2, y._2) >= 0) x else y))
val ((bestEngineParams, bestScore), bestIdx) = evalResultList
.zipWithIndex
.reduce { (x, y) =>
(if (metric.compare(x._1._2.score, y._1._2.score) >= 0) x else y)
}

MetricEvaluatorResult(
bestScore = bestScore,
bestEngineParams = bestEngineParams,
bestIdx = bestIdx,
metricHeader = metric.header,
otherMetricHeaders = otherMetrics.map(_.header),
engineParamsScores = evalResultList)
}
}
6 changes: 4 additions & 2 deletions core/src/main/scala/controller/Workflow.scala
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import org.json4s._
import org.json4s.native.Serialization.write
import io.prediction.data.storage.EngineInstance
import com.github.nscala_time.time.Imports.DateTime
import io.prediction.core.BaseEvaluatorResult

/** Workflow parameters.
*
Expand Down Expand Up @@ -127,7 +128,8 @@ object Workflow {
)
}

def runEvaluationTypeless[EI, Q, P, A, EEI, EQ, EP, EA, ER](
def runEvaluationTypeless[
EI, Q, P, A, EEI, EQ, EP, EA, ER <: BaseEvaluatorResult](
engine: BaseEngine[EI, Q, P, A],
engineParamsList: Seq[EngineParams],
// metric: Metric[MEI, MQ, MP, MA, MR],
Expand All @@ -146,7 +148,7 @@ object Workflow {

/** :: Experimental :: */
@Experimental
def runEvaluation[EI, Q, P, A, R](
def runEvaluation[EI, Q, P, A, R <: BaseEvaluatorResult](
engine: BaseEngine[EI, Q, P, A],
engineParamsList: Seq[EngineParams],
// metric: Metric[EI, Q, P, A, R],
Expand Down
6 changes: 5 additions & 1 deletion core/src/main/scala/controller/java/JavaWorkflow.scala
Original file line number Diff line number Diff line change
Expand Up @@ -43,13 +43,15 @@ object JavaWorkflow {
engineParams: EngineParams,
params: WorkflowParams
) {
/*
runEngine(
engine = engine,
engineParams = engineParams,
evaluatorClass = null,
evaluatorParams = null,
params = params
)
*/
}

/** Creates a workflow that runs an engine.
Expand All @@ -67,6 +69,7 @@ object JavaWorkflow {
* @param evaluatorParams Evaluator parameters.
* @param params Workflow parameters.
*/
/*
def runEngine[EI, TD, PD, Q, P, A, ER <: AnyRef](
engine: Engine[TD, EI, PD, Q, P, A],
engineParams: EngineParams,
Expand All @@ -75,6 +78,7 @@ object JavaWorkflow {
evaluatorParams: Params,
params: WorkflowParams
) {
*/
/*
JavaCoreWorkflow.run(
dataSourceClassMap = engine.dataSourceClassMap,
Expand All @@ -90,5 +94,5 @@ object JavaWorkflow {
params = params
)
*/
}
//}
}
13 changes: 11 additions & 2 deletions core/src/main/scala/core/BaseEvaluator.scala
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,21 @@ import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.rdd.RDD

abstract class BaseEvaluator[EI, Q, P, A, ER]
abstract class BaseEvaluator[EI, Q, P, A, ER <: BaseEvaluatorResult]
extends AbstractDoer {

def evaluateBase(
sc: SparkContext,
engineEvalDataSet: Seq[(EngineParams, Seq[(EI, RDD[(Q, P, A)])])],
params: WorkflowParams): ER
}

trait BaseEvaluatorResult extends Serializable {
/* A short description of the result. */
def toOneLiner(): String = ""

/** HTML portion of the rendered evaluator results. */
def toHTML(): String = ""

/** JSON portion of the rendered evaluator results. */
def toJSON(): String = ""
}
14 changes: 10 additions & 4 deletions core/src/main/scala/workflow/CoreWorkflow.scala
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import io.prediction.controller.WorkflowParams
import io.prediction.core.BaseAlgorithm
import io.prediction.core.BaseDataSource
import io.prediction.core.BaseEvaluator
import io.prediction.core.BaseEvaluatorResult
import io.prediction.core.BasePreparator
import io.prediction.core.BaseServing
import io.prediction.core.BaseEngine
Expand Down Expand Up @@ -121,7 +122,7 @@ object CoreWorkflow {
}
}

def runEvaluation[EI, Q, P, A, R](
def runEvaluation[EI, Q, P, A, R <: BaseEvaluatorResult](
engine: BaseEngine[EI, Q, P, A],
engineParamsList: Seq[EngineParams],
engineInstance: EngineInstance,
Expand All @@ -141,7 +142,7 @@ object CoreWorkflow {

WorkflowUtils.checkUpgrade(mode, engineInstance.engineFactory)

val evalResult = EvaluationWorkflow.runEvaluation(
val evalResult: BaseEvaluatorResult = EvaluationWorkflow.runEvaluation(
sc,
engine,
engineParamsList,
Expand All @@ -151,11 +152,16 @@ object CoreWorkflow {
implicit lazy val formats = Utils.json4sDefaultFormats +
new NameParamsSerializer

// TODO: Save best instance to EngineInstance
val engineInstanceId = Storage.getMetaDataEngineInstances.insert(
engineInstance)

val evaledEI = engineInstance.copy(
status = "EVALCOMPLETED",
id = engineInstanceId,
endTime = DateTime.now,
evaluatorResults = evalResult.toString
evaluatorResults = evalResult.toOneLiner,
evaluatorResultsHTML = evalResult.toHTML,
evaluatorResultsJSON = evalResult.toJSON
)

logger.info(s"Insert evaluation result")
Expand Down
Loading

0 comments on commit ea16070

Please sign in to comment.