Skip to content

Commit

Permalink
PDIO-425 Properly pass third party configuration files to driver and …
Browse files Browse the repository at this point in the history
…remote executors
  • Loading branch information
dszeto committed Dec 9, 2014
1 parent 10bf041 commit c030f7f
Show file tree
Hide file tree
Showing 6 changed files with 60 additions and 10 deletions.
3 changes: 3 additions & 0 deletions bin/compute-classpath.sh
Original file line number Diff line number Diff line change
Expand Up @@ -55,5 +55,8 @@ fi
if [ -n "$HBASE_CONF_DIR" ]; then
CLASSPATH="$CLASSPATH:$HBASE_CONF_DIR"
fi
if [ -n "$ES_CONF_DIR" ]; then
CLASSPATH="$CLASSPATH:$ES_CONF_DIR"
fi

echo "$CLASSPATH"
5 changes: 5 additions & 0 deletions conf/pio-env.sh.template
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,13 @@
# SPARK_HOME: Apache Spark is a hard dependency and must be configured.
SPARK_HOME=/path_to_apache_spark

# ES_CONF_DIR: You must configure this if you have advanced configuration for
# your Elasticsearch setup.
# ES_CONF_DIR=/opt/elasticsearch

# HADOOP_CONF_DIR: You must configure this if you intend to run PredictionIO
# with Hadoop 2.
# HADOOP_CONF_DIR=/opt/hadoop

# HBASE_CONF_DIR: You must configure this if you intend to run PredictionIO
# with HBase on a remote cluster.
Expand Down
18 changes: 13 additions & 5 deletions core/src/main/scala/workflow/WorkflowUtils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -165,23 +165,31 @@ object WorkflowUtils extends Logging {
s
}

/** Detect available Hadoop ecosystem configuration files to be submitted as
/** Detect third party software configuration files to be submitted as
* extras to Apache Spark. This makes sure all executors receive the same
* configuration.
*/
def hadoopEcoConfFiles: Seq[String] = {
val ecoFiles = Map(
def thirdPartyConfFiles: Seq[String] = {
val thirdPartyFiles = Map(
"ES_CONF_DIR" -> "elasticsearch.yml",
"HADOOP_CONF_DIR" -> "core-site.xml",
"HBASE_CONF_DIR" -> "hbase-site.xml")

ecoFiles.keys.toSeq.map { k: String =>
thirdPartyFiles.keys.toSeq.map { k: String =>
sys.env.get(k) map { x =>
val p = Seq(x, ecoFiles(k)).mkString(File.separator)
val p = Seq(x, thirdPartyFiles(k)).mkString(File.separator)
if (new File(p).exists) Seq(p) else Seq[String]()
} getOrElse Seq[String]()
}.flatten
}

def thirdPartyClasspaths: Seq[String] = {
val thirdPartyPaths = Seq("ES_CONF_DIR")
thirdPartyPaths.map(p =>
sys.env.get(p).map(Seq(_)).getOrElse(Seq[String]())
).flatten
}

def setupLogging(verbose: Boolean, debug: Boolean): Unit = {
val layout = new PatternLayout("%d %-5p %c{2} - %m%n")
val filter = new PIOFilter(verbose, debug)
Expand Down
2 changes: 1 addition & 1 deletion tools/src/main/scala/Console.scala
Original file line number Diff line number Diff line change
Expand Up @@ -954,7 +954,7 @@ object Console extends Logging {
def run(ca: ConsoleArgs): Unit = {
compile(ca)

val extraFiles = WorkflowUtils.hadoopEcoConfFiles
val extraFiles = WorkflowUtils.thirdPartyConfFiles

val jarFiles = jarFilesForScala
jarFiles foreach { f => info(s"Found JAR: ${f.getName}") }
Expand Down
21 changes: 19 additions & 2 deletions tools/src/main/scala/RunServer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,17 @@ object RunServer extends Logging {
val sparkHome = ca.common.sparkHome.getOrElse(
sys.env.get("SPARK_HOME").getOrElse("."))

val extraFiles = WorkflowUtils.hadoopEcoConfFiles
val extraFiles = WorkflowUtils.thirdPartyConfFiles

val driverClassPathIndex =
ca.common.sparkPassThrough.indexOf("--driver-class-path")
val driverClassPathPrefix =
if (driverClassPathIndex != -1)
Seq(ca.common.sparkPassThrough(driverClassPathIndex + 1))
else
Seq()
val extraClasspaths =
driverClassPathPrefix ++ WorkflowUtils.thirdPartyClasspaths

val sparkSubmit =
Seq(Seq(sparkHome, "bin", "spark-submit").mkString(File.separator)) ++
Expand All @@ -50,7 +60,14 @@ object RunServer extends Logging {
"--jars",
(em.files ++ Console.builtinEngines(
ca.common.pioHome.get).map(_.getCanonicalPath)).mkString(",")) ++
(if (extraFiles.size > 0) Seq("--files") ++ extraFiles else Seq()) ++
(if (extraFiles.size > 0)
Seq("--files", extraFiles.mkString(","))
else
Seq()) ++
(if (extraClasspaths.size > 0)
Seq("--driver-class-path", extraClasspaths.mkString(":"))
else
Seq()) ++
Seq(
core.getCanonicalPath,
"--engineInstanceId",
Expand Down
21 changes: 19 additions & 2 deletions tools/src/main/scala/RunWorkflow.scala
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,17 @@ object RunWorkflow extends Logging {
val hadoopConf = new Configuration
val hdfs = FileSystem.get(hadoopConf)

val extraFiles = WorkflowUtils.hadoopEcoConfFiles
val extraFiles = WorkflowUtils.thirdPartyConfFiles

val driverClassPathIndex =
ca.common.sparkPassThrough.indexOf("--driver-class-path")
val driverClassPathPrefix =
if (driverClassPathIndex != -1)
Seq(ca.common.sparkPassThrough(driverClassPathIndex + 1))
else
Seq()
val extraClasspaths =
driverClassPathPrefix ++ WorkflowUtils.thirdPartyClasspaths

val workMode = ca.metricsClass.map(_ => "Evaluation").getOrElse("Training")
val sparkSubmit =
Expand All @@ -65,7 +75,14 @@ object RunWorkflow extends Logging {
"--jars",
(em.files ++ Console.builtinEngines(
ca.common.pioHome.get).map(_.getCanonicalPath)).mkString(",")) ++
(if (extraFiles.size > 0) Seq("--files") ++ extraFiles else Seq()) ++
(if (extraFiles.size > 0)
Seq("--files", extraFiles.mkString(","))
else
Seq()) ++
(if (extraClasspaths.size > 0)
Seq("--driver-class-path", extraClasspaths.mkString(":"))
else
Seq()) ++
Seq(
core.getCanonicalPath,
"--env",
Expand Down

0 comments on commit c030f7f

Please sign in to comment.