From 036197e21f16769f937cde610bda0ddc502783e2 Mon Sep 17 00:00:00 2001 From: Till Rohrmann Date: Mon, 20 Apr 2020 13:58:19 +0200 Subject: [PATCH] [FLINK-17266] Make WorkerResourceSpec serializable The MesosWorkerStore requires that the WorkerResourceSpec is serializable because it stores it as part of MesosWorkerStore.Worker in serialized format. This closes #11821. --- .../store/MesosWorkerStoreTest.java | 51 +++++++++++++++++++ .../resourcemanager/WorkerResourceSpec.java | 5 +- 2 files changed, 55 insertions(+), 1 deletion(-) create mode 100644 flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/store/MesosWorkerStoreTest.java diff --git a/flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/store/MesosWorkerStoreTest.java b/flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/store/MesosWorkerStoreTest.java new file mode 100644 index 0000000000000..e454b52ef7ab2 --- /dev/null +++ b/flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/store/MesosWorkerStoreTest.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.mesos.runtime.clusterframework.store; + +import org.apache.flink.runtime.resourcemanager.WorkerResourceSpec; +import org.apache.flink.util.InstantiationUtil; +import org.apache.flink.util.TestLogger; + +import org.apache.mesos.Protos; +import org.junit.Test; + +import static org.junit.Assert.assertTrue; + +/** + * Tests for the {@link MesosWorkerStore}. + */ +public class MesosWorkerStoreTest extends TestLogger { + + /** + * Tests that {@link MesosWorkerStore.Worker} is serializable. + */ + @Test + public void workerIsSerializable() { + final Protos.TaskID taskId = Protos.TaskID.newBuilder().setValue("foobar").build(); + final WorkerResourceSpec workerResourceSpec = new WorkerResourceSpec.Builder().build(); + final Protos.SlaveID slaveId = Protos.SlaveID.newBuilder().setValue("barfoo").build(); + final String hostname = "foobar"; + + final MesosWorkerStore.Worker worker = MesosWorkerStore.Worker.newWorker(taskId, workerResourceSpec); + final MesosWorkerStore.Worker launchedWorker = worker.launchWorker(slaveId, hostname); + + assertTrue(InstantiationUtil.isSerializable(worker)); + assertTrue(InstantiationUtil.isSerializable(launchedWorker)); + } +} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/WorkerResourceSpec.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/WorkerResourceSpec.java index d5b7cb41add74..6edbfe4d48d07 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/WorkerResourceSpec.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/WorkerResourceSpec.java @@ -23,12 +23,15 @@ import org.apache.flink.runtime.clusterframework.TaskExecutorProcessSpec; import org.apache.flink.util.Preconditions; +import java.io.Serializable; import java.util.Objects; /** * Resource specification of a worker, mainly used by SlotManager requesting from ResourceManager. */ -public class WorkerResourceSpec { +public final class WorkerResourceSpec implements Serializable { + + private static final long serialVersionUID = 1L; public static final WorkerResourceSpec ZERO = new Builder().build();