diff --git a/bin/compute-classpath.sh b/bin/compute-classpath.sh index 6bbc2da4cb..8c02f4df9a 100755 --- a/bin/compute-classpath.sh +++ b/bin/compute-classpath.sh @@ -55,5 +55,8 @@ fi if [ -n "$HBASE_CONF_DIR" ]; then CLASSPATH="$CLASSPATH:$HBASE_CONF_DIR" fi +if [ -n "$ES_CONF_DIR" ]; then + CLASSPATH="$CLASSPATH:$ES_CONF_DIR" +fi echo "$CLASSPATH" diff --git a/conf/pio-env.sh.template b/conf/pio-env.sh.template index afb597f588..ca109572fe 100644 --- a/conf/pio-env.sh.template +++ b/conf/pio-env.sh.template @@ -10,8 +10,13 @@ # SPARK_HOME: Apache Spark is a hard dependency and must be configured. SPARK_HOME=/path_to_apache_spark +# ES_CONF_DIR: You must configure this if you have advanced configuration for +# your Elasticsearch setup. +# ES_CONF_DIR=/opt/elasticsearch + # HADOOP_CONF_DIR: You must configure this if you intend to run PredictionIO # with Hadoop 2. +# HADOOP_CONF_DIR=/opt/hadoop # HBASE_CONF_DIR: You must configure this if you intend to run PredictionIO # with HBase on a remote cluster. diff --git a/core/src/main/scala/workflow/WorkflowUtils.scala b/core/src/main/scala/workflow/WorkflowUtils.scala index db5c16b17f..6dabbcb9b3 100644 --- a/core/src/main/scala/workflow/WorkflowUtils.scala +++ b/core/src/main/scala/workflow/WorkflowUtils.scala @@ -165,23 +165,31 @@ object WorkflowUtils extends Logging { s } - /** Detect available Hadoop ecosystem configuration files to be submitted as + /** Detect third party software configuration files to be submitted as * extras to Apache Spark. This makes sure all executors receive the same * configuration. */ - def hadoopEcoConfFiles: Seq[String] = { - val ecoFiles = Map( + def thirdPartyConfFiles: Seq[String] = { + val thirdPartyFiles = Map( + "ES_CONF_DIR" -> "elasticsearch.yml", "HADOOP_CONF_DIR" -> "core-site.xml", "HBASE_CONF_DIR" -> "hbase-site.xml") - ecoFiles.keys.toSeq.map { k: String => + thirdPartyFiles.keys.toSeq.map { k: String => sys.env.get(k) map { x => - val p = Seq(x, ecoFiles(k)).mkString(File.separator) + val p = Seq(x, thirdPartyFiles(k)).mkString(File.separator) if (new File(p).exists) Seq(p) else Seq[String]() } getOrElse Seq[String]() }.flatten } + def thirdPartyClasspaths: Seq[String] = { + val thirdPartyPaths = Seq("ES_CONF_DIR") + thirdPartyPaths.map(p => + sys.env.get(p).map(Seq(_)).getOrElse(Seq[String]()) + ).flatten + } + def setupLogging(verbose: Boolean, debug: Boolean): Unit = { val layout = new PatternLayout("%d %-5p %c{2} - %m%n") val filter = new PIOFilter(verbose, debug) diff --git a/tools/src/main/scala/Console.scala b/tools/src/main/scala/Console.scala index 57943e61fa..4cb4b151f7 100644 --- a/tools/src/main/scala/Console.scala +++ b/tools/src/main/scala/Console.scala @@ -954,7 +954,7 @@ object Console extends Logging { def run(ca: ConsoleArgs): Unit = { compile(ca) - val extraFiles = WorkflowUtils.hadoopEcoConfFiles + val extraFiles = WorkflowUtils.thirdPartyConfFiles val jarFiles = jarFilesForScala jarFiles foreach { f => info(s"Found JAR: ${f.getName}") } diff --git a/tools/src/main/scala/RunServer.scala b/tools/src/main/scala/RunServer.scala index 637f1acbc9..6c9508c1b9 100644 --- a/tools/src/main/scala/RunServer.scala +++ b/tools/src/main/scala/RunServer.scala @@ -37,7 +37,17 @@ object RunServer extends Logging { val sparkHome = ca.common.sparkHome.getOrElse( sys.env.get("SPARK_HOME").getOrElse(".")) - val extraFiles = WorkflowUtils.hadoopEcoConfFiles + val extraFiles = WorkflowUtils.thirdPartyConfFiles + + val driverClassPathIndex = + ca.common.sparkPassThrough.indexOf("--driver-class-path") + val driverClassPathPrefix = + if (driverClassPathIndex != -1) + Seq(ca.common.sparkPassThrough(driverClassPathIndex + 1)) + else + Seq() + val extraClasspaths = + driverClassPathPrefix ++ WorkflowUtils.thirdPartyClasspaths val sparkSubmit = Seq(Seq(sparkHome, "bin", "spark-submit").mkString(File.separator)) ++ @@ -50,7 +60,14 @@ object RunServer extends Logging { "--jars", (em.files ++ Console.builtinEngines( ca.common.pioHome.get).map(_.getCanonicalPath)).mkString(",")) ++ - (if (extraFiles.size > 0) Seq("--files") ++ extraFiles else Seq()) ++ + (if (extraFiles.size > 0) + Seq("--files", extraFiles.mkString(",")) + else + Seq()) ++ + (if (extraClasspaths.size > 0) + Seq("--driver-class-path", extraClasspaths.mkString(":")) + else + Seq()) ++ Seq( core.getCanonicalPath, "--engineInstanceId", diff --git a/tools/src/main/scala/RunWorkflow.scala b/tools/src/main/scala/RunWorkflow.scala index 99677f63b7..170bd3b70a 100644 --- a/tools/src/main/scala/RunWorkflow.scala +++ b/tools/src/main/scala/RunWorkflow.scala @@ -51,7 +51,17 @@ object RunWorkflow extends Logging { val hadoopConf = new Configuration val hdfs = FileSystem.get(hadoopConf) - val extraFiles = WorkflowUtils.hadoopEcoConfFiles + val extraFiles = WorkflowUtils.thirdPartyConfFiles + + val driverClassPathIndex = + ca.common.sparkPassThrough.indexOf("--driver-class-path") + val driverClassPathPrefix = + if (driverClassPathIndex != -1) + Seq(ca.common.sparkPassThrough(driverClassPathIndex + 1)) + else + Seq() + val extraClasspaths = + driverClassPathPrefix ++ WorkflowUtils.thirdPartyClasspaths val workMode = ca.metricsClass.map(_ => "Evaluation").getOrElse("Training") val sparkSubmit = @@ -65,7 +75,14 @@ object RunWorkflow extends Logging { "--jars", (em.files ++ Console.builtinEngines( ca.common.pioHome.get).map(_.getCanonicalPath)).mkString(",")) ++ - (if (extraFiles.size > 0) Seq("--files") ++ extraFiles else Seq()) ++ + (if (extraFiles.size > 0) + Seq("--files", extraFiles.mkString(",")) + else + Seq()) ++ + (if (extraClasspaths.size > 0) + Seq("--driver-class-path", extraClasspaths.mkString(":")) + else + Seq()) ++ Seq( core.getCanonicalPath, "--env",