Skip to content

Commit

Permalink
[FLINK-8490] Allow custom docker parameters for docker tasks on Mesos
Browse files Browse the repository at this point in the history
This closes apache#5346.
  • Loading branch information
lishim authored and tillrohrmann committed Jan 26, 2018
1 parent a6d7f2d commit 6969fe2
Show file tree
Hide file tree
Showing 8 changed files with 92 additions and 1 deletion.
2 changes: 2 additions & 0 deletions docs/ops/config.md
Original file line number Diff line number Diff line change
Expand Up @@ -512,6 +512,8 @@ May be set to -1 to disable this feature.

- `mesos.resourcemanager.tasks.container.volumes`: A comma separated list of `[host_path:]`container_path`[:RO|RW]`. This allows for mounting additional volumes into your container. (**NO DEFAULT**)

- `mesos.resourcemanager.tasks.container.docker.parameters`: Custom parameters to be passed into docker run command when using the docker containerizer. Comma separated list of `key=value` pairs. `value` may contain '=' (**NO DEFAULT**)

- `high-availability.zookeeper.path.mesos-workers`: The ZooKeeper root path for persisting the Mesos worker information.

### High Availability (HA)
Expand Down
2 changes: 2 additions & 0 deletions docs/ops/deployment/mesos.md
Original file line number Diff line number Diff line change
Expand Up @@ -264,6 +264,8 @@ May be set to -1 to disable this feature.

`mesos.resourcemanager.tasks.container.volumes`: A comma separated list of `[host_path:]`container_path`[:RO|RW]`. This allows for mounting additional volumes into your container. (**NO DEFAULT**)

`mesos.resourcemanager.tasks.container.docker.parameters`: Custom parameters to be passed into docker run command when using the docker containerizer. Comma separated list of `key=value` pairs. `value` may contain '=' (**NO DEFAULT**)

`mesos.resourcemanager.tasks.hostname`: Optional value to define the TaskManager's hostname. The pattern `_TASK_` is replaced by the actual id of the Mesos task. This can be used to configure the TaskManager to use Mesos DNS (e.g. `_TASK_.flink-service.mesos`) for name lookups. (**NO DEFAULT**)

`mesos.resourcemanager.tasks.bootstrap-cmd`: A command which is executed before the TaskManager is started (**NO DEFAULT**).
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -298,6 +298,7 @@ public Protos.TaskInfo launch(Protos.SlaveID slaveId, MesosResourceAllocation al
containerInfo
.setType(Protos.ContainerInfo.Type.DOCKER)
.setDocker(Protos.ContainerInfo.DockerInfo.newBuilder()
.addAllParameters(params.dockerParameters())
.setNetwork(Protos.ContainerInfo.DockerInfo.Network.HOST)
.setImage(params.containerImageName().get()));
break;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -672,6 +672,7 @@ private LaunchableMesosWorker createLaunchableMesosWorker(Protos.TaskID taskID,
1,
new HashMap<>(taskManagerParameters.containeredParameters().taskManagerEnv())),
taskManagerParameters.containerVolumes(),
taskManagerParameters.dockerParameters(),
taskManagerParameters.constraints(),
taskManagerParameters.command(),
taskManagerParameters.bootstrapCommand(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,10 @@ public class MesosTaskManagerParameters {
key("mesos.resourcemanager.tasks.container.volumes")
.noDefaultValue();

public static final ConfigOption<String> MESOS_RM_CONTAINER_DOCKER_PARAMETERS =
key("mesos.resourcemanager.tasks.container.docker.parameters")
.noDefaultValue();

public static final ConfigOption<String> MESOS_CONSTRAINTS_HARD_HOSTATTR =
key("mesos.constraints.hard.hostattribute")
.noDefaultValue();
Expand All @@ -109,6 +113,8 @@ public class MesosTaskManagerParameters {

private final List<Protos.Volume> containerVolumes;

private final List<Protos.Parameter> dockerParameters;

private final List<ConstraintEvaluator> constraints;

private final String command;
Expand All @@ -123,6 +129,7 @@ public MesosTaskManagerParameters(
Option<String> containerImageName,
ContaineredTaskManagerParameters containeredParameters,
List<Protos.Volume> containerVolumes,
List<Protos.Parameter> dockerParameters,
List<ConstraintEvaluator> constraints,
String command,
Option<String> bootstrapCommand,
Expand All @@ -133,6 +140,7 @@ public MesosTaskManagerParameters(
this.containerImageName = Preconditions.checkNotNull(containerImageName);
this.containeredParameters = Preconditions.checkNotNull(containeredParameters);
this.containerVolumes = Preconditions.checkNotNull(containerVolumes);
this.dockerParameters = Preconditions.checkNotNull(dockerParameters);
this.constraints = Preconditions.checkNotNull(constraints);
this.command = Preconditions.checkNotNull(command);
this.bootstrapCommand = Preconditions.checkNotNull(bootstrapCommand);
Expand Down Expand Up @@ -176,6 +184,13 @@ public List<Protos.Volume> containerVolumes() {
return containerVolumes;
}

/**
* Get Docker runtime parameters.
*/
public List<Protos.Parameter> dockerParameters() {
return dockerParameters;
}

/**
* Get the placement constraints.
*/
Expand Down Expand Up @@ -212,6 +227,7 @@ public String toString() {
", containerImageName=" + containerImageName +
", containeredParameters=" + containeredParameters +
", containerVolumes=" + containerVolumes +
", dockerParameters=" + dockerParameters +
", constraints=" + constraints +
", taskManagerHostName=" + taskManagerHostname +
", command=" + command +
Expand Down Expand Up @@ -260,8 +276,12 @@ public static MesosTaskManagerParameters create(Configuration flinkConfig) {

Option<String> containerVolOpt = Option.<String>apply(flinkConfig.getString(MESOS_RM_CONTAINER_VOLUMES));

Option<String> dockerParamsOpt = Option.<String>apply(flinkConfig.getString(MESOS_RM_CONTAINER_DOCKER_PARAMETERS));

List<Protos.Volume> containerVolumes = buildVolumes(containerVolOpt);

List<Protos.Parameter> dockerParameters = buildDockerParameters(dockerParamsOpt);

//obtain Task Manager Host Name from the configuration
Option<String> taskManagerHostname = Option.apply(flinkConfig.getString(MESOS_TM_HOSTNAME));

Expand All @@ -275,6 +295,7 @@ public static MesosTaskManagerParameters create(Configuration flinkConfig) {
Option.apply(imageName),
containeredParameters,
containerVolumes,
dockerParameters,
constraints,
tmCommand,
tmBootstrapCommand,
Expand Down Expand Up @@ -365,6 +386,32 @@ public static List<Protos.Volume> buildVolumes(Option<String> containerVolumes)
}
}

public static List<Protos.Parameter> buildDockerParameters(Option<String> dockerParameters) {
if (dockerParameters.isEmpty()) {
return Collections.emptyList();
} else {
String[] dockerParameterSpecifications = dockerParameters.get().split(",");

List<Protos.Parameter> parameters = new ArrayList<>(dockerParameterSpecifications.length);

for (String dockerParameterSpecification : dockerParameterSpecifications) {
if (!dockerParameterSpecification.trim().isEmpty()) {
// split with the limit of 2 in case the value includes '='
String[] match = dockerParameterSpecification.split("=", 2);
if (match.length != 2) {
throw new IllegalArgumentException("Docker parameter specification is invalid, given: "
+ dockerParameterSpecification);
}
Protos.Parameter.Builder parameter = Protos.Parameter.newBuilder();
parameter.setKey(match[0]);
parameter.setValue(match[1]);
parameters.add(parameter.build());
}
}
return parameters;
}
}

/**
* The supported containerizers.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -250,6 +250,7 @@ public void initialize() {
Option.<String>empty(),
containeredParams,
Collections.<Protos.Volume>emptyList(),
Collections.<Protos.Parameter>emptyList(),
Collections.<ConstraintEvaluator>emptyList(),
"",
Option.<String>empty(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -263,7 +263,8 @@ static class Context implements AutoCloseable {
new ContaineredTaskManagerParameters(1024, 768, 256, 4, new HashMap<String, String>());
MesosTaskManagerParameters tmParams = new MesosTaskManagerParameters(
1.0, MesosTaskManagerParameters.ContainerType.MESOS, Option.<String>empty(), containeredParams,
Collections.<Protos.Volume>emptyList(), Collections.<ConstraintEvaluator>emptyList(), "", Option.<String>empty(),
Collections.<Protos.Volume>emptyList(), Collections.<Protos.Parameter>emptyList(),
Collections.<ConstraintEvaluator>emptyList(), "", Option.<String>empty(),
Option.<String>empty());

// resource manager
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,42 @@ public void testContainerVolumes() throws Exception {
assertEquals(Protos.Volume.Mode.RO, params.containerVolumes().get(0).getMode());
}

@Test
public void testContainerDockerParameter() throws Exception {
Configuration config = new Configuration();
config.setString(MesosTaskManagerParameters.MESOS_RM_CONTAINER_DOCKER_PARAMETERS, "testKey=testValue");

MesosTaskManagerParameters params = MesosTaskManagerParameters.create(config);
assertEquals(params.dockerParameters().size(), 1);
assertEquals(params.dockerParameters().get(0).getKey(), "testKey");
assertEquals(params.dockerParameters().get(0).getValue(), "testValue");
}

@Test
public void testContainerDockerParameters() throws Exception {
Configuration config = new Configuration();
config.setString(MesosTaskManagerParameters.MESOS_RM_CONTAINER_DOCKER_PARAMETERS,
"testKey1=testValue1,testKey2=testValue2,testParam3=key3=value3,testParam4=\"key4=value4\"");

MesosTaskManagerParameters params = MesosTaskManagerParameters.create(config);
assertEquals(params.dockerParameters().size(), 4);
assertEquals(params.dockerParameters().get(0).getKey(), "testKey1");
assertEquals(params.dockerParameters().get(0).getValue(), "testValue1");
assertEquals(params.dockerParameters().get(1).getKey(), "testKey2");
assertEquals(params.dockerParameters().get(1).getValue(), "testValue2");
assertEquals(params.dockerParameters().get(2).getKey(), "testParam3");
assertEquals(params.dockerParameters().get(2).getValue(), "key3=value3");
assertEquals(params.dockerParameters().get(3).getKey(), "testParam4");
assertEquals(params.dockerParameters().get(3).getValue(), "\"key4=value4\"");
}

@Test(expected = IllegalArgumentException.class)
public void testContainerDockerParametersMalformed() throws Exception {
Configuration config = new Configuration();
config.setString(MesosTaskManagerParameters.MESOS_RM_CONTAINER_DOCKER_PARAMETERS, "badParam");
MesosTaskManagerParameters params = MesosTaskManagerParameters.create(config);
}

@Test
public void givenTwoConstraintsInConfigShouldBeParsed() throws Exception {

Expand Down

0 comments on commit 6969fe2

Please sign in to comment.