Skip to content

Commit

Permalink
[FLINK-1984] port Mesos code to latest master
Browse files Browse the repository at this point in the history
- move Scala code to /scala dir
- remove merge commits
- update version

This closes apache#2315
  • Loading branch information
mxm committed Aug 29, 2016
1 parent 38a9534 commit 842e3e7
Show file tree
Hide file tree
Showing 5 changed files with 12 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -742,6 +742,10 @@ public final class ConfigConstants {
@PublicEvolving
public static final String HA_ZOOKEEPER_CHECKPOINT_COUNTER_PATH = "high-availability.zookeeper.path.checkpoint-counter";

/** ZooKeeper root path (ZNode) for Mesos workers. */
@PublicEvolving
public static final String HA_ZOOKEEPER_MESOS_WORKERS_PATH = "recovery.zookeeper.path.mesos-workers";

@PublicEvolving
public static final String HA_ZOOKEEPER_SESSION_TIMEOUT = "high-availability.zookeeper.client.session-timeout";

Expand Down Expand Up @@ -790,9 +794,6 @@ public final class ConfigConstants {
@Deprecated
public static final String ZOOKEEPER_CHECKPOINT_COUNTER_PATH = "recovery.zookeeper.path.checkpoint-counter";

/** ZooKeeper root path (ZNode) for Mesos workers. */
public static final String ZOOKEEPER_MESOS_WORKERS_PATH = "recovery.zookeeper.path.mesos-workers";

/** Deprecated in favour of {@link #HA_ZOOKEEPER_SESSION_TIMEOUT}. */
@Deprecated
public static final String ZOOKEEPER_SESSION_TIMEOUT = "recovery.zookeeper.client.session-timeout";
Expand Down
2 changes: 1 addition & 1 deletion flink-mesos/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ under the License.
<parent>
<groupId>org.apache.flink</groupId>
<artifactId>flink-parent</artifactId>
<version>1.1-SNAPSHOT</version>
<version>1.2-SNAPSHOT</version>
<relativePath>..</relativePath>
</parent>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,9 +38,9 @@
import org.apache.flink.runtime.akka.AkkaUtils;
import org.apache.flink.runtime.clusterframework.BootstrapTools;
import org.apache.flink.runtime.clusterframework.ContaineredTaskManagerParameters;
import org.apache.flink.runtime.jobmanager.HighAvailabilityMode;
import org.apache.flink.runtime.jobmanager.JobManager;
import org.apache.flink.runtime.jobmanager.MemoryArchivist;
import org.apache.flink.runtime.jobmanager.RecoveryMode;
import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
import org.apache.flink.runtime.process.ProcessReaper;
import org.apache.flink.runtime.taskmanager.TaskManager;
Expand Down Expand Up @@ -481,11 +481,11 @@ public static MesosConfiguration createMesosConfig(Configuration flinkConfig, St

private static MesosWorkerStore createWorkerStore(Configuration flinkConfig) throws Exception {
MesosWorkerStore workerStore;
RecoveryMode recoveryMode = RecoveryMode.fromConfig(flinkConfig);
if (recoveryMode == RecoveryMode.STANDALONE) {
HighAvailabilityMode recoveryMode = HighAvailabilityMode.fromConfig(flinkConfig);
if (recoveryMode == HighAvailabilityMode.NONE) {
workerStore = new StandaloneMesosWorkerStore();
}
else if (recoveryMode == RecoveryMode.ZOOKEEPER) {
else if (recoveryMode == HighAvailabilityMode.ZOOKEEPER) {
// note: the store is responsible for closing the client.
CuratorFramework client = ZooKeeperUtils.startCuratorFramework(flinkConfig);
workerStore = ZooKeeperMesosWorkerStore.createMesosWorkerStore(client, flinkConfig);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -292,7 +292,7 @@ public static ZooKeeperMesosWorkerStore createMesosWorkerStore(
ZooKeeperUtils.createFileSystemStateStorage(configuration, "mesosWorkerStore");

String zooKeeperMesosWorkerStorePath = configuration.getString(
ConfigConstants.ZOOKEEPER_MESOS_WORKERS_PATH,
ConfigConstants.HA_ZOOKEEPER_MESOS_WORKERS_PATH,
ConfigConstants.DEFAULT_ZOOKEEPER_MESOS_WORKERS_PATH
);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import java.util.concurrent.ExecutorService

import akka.actor.ActorRef
import org.apache.flink.api.common.JobID
import org.apache.flink.configuration.{Configuration => FlinkConfiguration}
import org.apache.flink.configuration.Configuration
import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory
import org.apache.flink.runtime.checkpoint.savepoint.SavepointStore
import org.apache.flink.runtime.clusterframework.messages._
Expand Down Expand Up @@ -57,7 +57,7 @@ import scala.language.postfixOps
* @param leaderElectionService LeaderElectionService to participate in the leader election
*/
abstract class ContaineredJobManager(
flinkConfiguration: FlinkConfiguration,
flinkConfiguration: Configuration,
executorService: ExecutorService,
instanceManager: InstanceManager,
scheduler: FlinkScheduler,
Expand Down

0 comments on commit 842e3e7

Please sign in to comment.