Skip to content

Commit

Permalink
[FLINK-26758][container] Migrate tests to JUnit 5
Browse files Browse the repository at this point in the history
  • Loading branch information
RocMarshal committed Mar 22, 2022
1 parent 7acfa5e commit 950cd16
Show file tree
Hide file tree
Showing 3 changed files with 84 additions and 82 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -28,40 +28,31 @@
import org.apache.flink.runtime.entrypoint.parser.CommandLineParser;
import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.TestLogger;

import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;

import java.io.File;
import java.io.IOException;
import java.nio.file.Path;
import java.util.Optional;
import java.util.Properties;

import static org.hamcrest.Matchers.arrayContaining;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.hasEntry;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.core.IsNull.nullValue;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
import static org.assertj.core.api.Fail.fail;

/** Tests for the {@link StandaloneApplicationClusterConfigurationParserFactory}. */
public class StandaloneApplicationClusterConfigurationParserFactoryTest extends TestLogger {
class StandaloneApplicationClusterConfigurationParserFactoryTest {

@Rule public TemporaryFolder tempFolder = new TemporaryFolder();
private File confFile;
private String confDirPath;

@Before
public void createEmptyFlinkConfiguration() throws IOException {
File confDir = tempFolder.getRoot();
confDirPath = confDir.getAbsolutePath();
confFile = new File(confDir, GlobalConfiguration.FLINK_CONF_FILENAME);
@BeforeEach
void createEmptyFlinkConfiguration(@TempDir Path tempFolder) throws IOException {
confDirPath = tempFolder.toFile().getAbsolutePath();
confFile = new File(tempFolder.toFile(), GlobalConfiguration.FLINK_CONF_FILENAME);
confFile.createNewFile();
}

Expand All @@ -72,8 +63,7 @@ public void createEmptyFlinkConfiguration() throws IOException {
private static final String JOB_CLASS_NAME = "foobar";

@Test
public void testEntrypointClusterConfigurationToConfigurationParsing()
throws FlinkParseException {
void testEntrypointClusterConfigurationToConfigurationParsing() throws FlinkParseException {
final JobID jobID = JobID.generate();
final SavepointRestoreSettings savepointRestoreSettings =
SavepointRestoreSettings.forPath("/test/savepoint/path", true);
Expand Down Expand Up @@ -101,25 +91,24 @@ public void testEntrypointClusterConfigurationToConfigurationParsing()

final StandaloneApplicationClusterConfiguration clusterConfiguration =
commandLineParser.parse(args);
assertThat(clusterConfiguration.getJobClassName(), is(equalTo(JOB_CLASS_NAME)));
assertThat(clusterConfiguration.getArgs(), arrayContaining(arg1, arg2));
assertThat(clusterConfiguration.getJobClassName()).isEqualTo(JOB_CLASS_NAME);
assertThat(clusterConfiguration.getArgs()).contains(arg1, arg2);

final Configuration configuration =
StandaloneApplicationClusterEntryPoint.loadConfigurationFromClusterConfig(
clusterConfiguration);

final String strJobId = configuration.get(PipelineOptionsInternal.PIPELINE_FIXED_JOB_ID);
assertThat(JobID.fromHexString(strJobId), is(equalTo(jobID)));
assertThat(
SavepointRestoreSettings.fromConfiguration(configuration),
is(equalTo(savepointRestoreSettings)));
assertThat(JobID.fromHexString(strJobId)).isEqualTo(jobID);
assertThat(SavepointRestoreSettings.fromConfiguration(configuration))
.isEqualTo(savepointRestoreSettings);

assertThat(configuration.get(RestOptions.PORT), is(equalTo(restPort)));
assertThat(configuration.get(DeploymentOptions.TARGET), is(equalTo(value)));
assertThat(configuration.get(RestOptions.PORT)).isEqualTo(restPort);
assertThat(configuration.get(DeploymentOptions.TARGET)).isEqualTo(value);
}

@Test
public void testEntrypointClusterConfigWOSavepointSettingsToConfigurationParsing()
void testEntrypointClusterConfigWOSavepointSettingsToConfigurationParsing()
throws FlinkParseException {
final JobID jobID = JobID.generate();
final String[] args = {"-c", confDirPath, "--job-id", jobID.toHexString()};
Expand All @@ -131,14 +120,13 @@ public void testEntrypointClusterConfigWOSavepointSettingsToConfigurationParsing
clusterConfiguration);

final String strJobId = configuration.get(PipelineOptionsInternal.PIPELINE_FIXED_JOB_ID);
assertThat(JobID.fromHexString(strJobId), is(equalTo(jobID)));
assertThat(
SavepointRestoreSettings.fromConfiguration(configuration),
is(equalTo(SavepointRestoreSettings.none())));
assertThat(JobID.fromHexString(strJobId)).isEqualTo(jobID);
assertThat(SavepointRestoreSettings.fromConfiguration(configuration))
.isEqualTo(SavepointRestoreSettings.none());
}

@Test
public void testEntrypointClusterConfigurationParsing() throws FlinkParseException {
void testEntrypointClusterConfigurationParsing() throws FlinkParseException {
final String key = "key";
final String value = "value";
final int restPort = 1234;
Expand All @@ -159,50 +147,48 @@ public void testEntrypointClusterConfigurationParsing() throws FlinkParseExcepti
final StandaloneApplicationClusterConfiguration clusterConfiguration =
commandLineParser.parse(args);

assertThat(clusterConfiguration.getConfigDir(), is(equalTo(confDirPath)));
assertThat(clusterConfiguration.getJobClassName(), is(equalTo(JOB_CLASS_NAME)));
assertThat(clusterConfiguration.getRestPort(), is(equalTo(restPort)));
assertThat(clusterConfiguration.getConfigDir()).isEqualTo(confDirPath);
assertThat(clusterConfiguration.getJobClassName()).isEqualTo(JOB_CLASS_NAME);
assertThat(clusterConfiguration.getRestPort()).isEqualTo(restPort);
final Properties dynamicProperties = clusterConfiguration.getDynamicProperties();

assertThat(dynamicProperties, hasEntry(key, value));
assertThat(dynamicProperties).containsEntry(key, value);

assertThat(clusterConfiguration.getArgs(), arrayContaining(arg1, arg2));
assertThat(clusterConfiguration.getArgs()).contains(arg1, arg2);

assertThat(
clusterConfiguration.getSavepointRestoreSettings(),
is(equalTo(SavepointRestoreSettings.none())));
assertThat(clusterConfiguration.getSavepointRestoreSettings())
.isEqualTo(SavepointRestoreSettings.none());

assertThat(clusterConfiguration.getJobId(), is(nullValue()));
assertThat(clusterConfiguration.getJobId()).isNull();
}

@Test
public void testOnlyRequiredArguments() throws FlinkParseException {
void testOnlyRequiredArguments() throws FlinkParseException {
final String[] args = {"--configDir", confDirPath};

final StandaloneApplicationClusterConfiguration clusterConfiguration =
commandLineParser.parse(args);

assertThat(clusterConfiguration.getConfigDir(), is(equalTo(confDirPath)));
assertThat(clusterConfiguration.getDynamicProperties(), is(equalTo(new Properties())));
assertThat(clusterConfiguration.getArgs(), is(new String[0]));
assertThat(clusterConfiguration.getRestPort(), is(equalTo(-1)));
assertThat(clusterConfiguration.getHostname(), is(nullValue()));
assertThat(
clusterConfiguration.getSavepointRestoreSettings(),
is(equalTo(SavepointRestoreSettings.none())));
assertThat(clusterConfiguration.getJobId(), is(nullValue()));
assertThat(clusterConfiguration.getJobClassName(), is(nullValue()));
assertThat(clusterConfiguration.getConfigDir()).isEqualTo(confDirPath);
assertThat(clusterConfiguration.getDynamicProperties()).isEqualTo(new Properties());
assertThat(clusterConfiguration.getArgs()).isEqualTo(new String[0]);
assertThat(clusterConfiguration.getRestPort()).isEqualTo(-1);
assertThat(clusterConfiguration.getHostname()).isNull();
assertThat(clusterConfiguration.getSavepointRestoreSettings())
.isEqualTo(SavepointRestoreSettings.none());
assertThat(clusterConfiguration.getJobId()).isNull();
assertThat(clusterConfiguration.getJobClassName()).isNull();
}

@Test(expected = FlinkParseException.class)
public void testMissingRequiredArgument() throws FlinkParseException {
@Test
void testMissingRequiredArgument() {
final String[] args = {};

commandLineParser.parse(args);
assertThatThrownBy(() -> commandLineParser.parse(args))
.isInstanceOf(FlinkParseException.class);
}

@Test
public void testSavepointRestoreSettingsParsing() throws FlinkParseException {
void testSavepointRestoreSettingsParsing() throws FlinkParseException {
final String restorePath = "foobar";
final String[] args = {"-c", confDirPath, "-j", JOB_CLASS_NAME, "-s", restorePath, "-n"};
final StandaloneApplicationClusterConfiguration standaloneApplicationClusterConfiguration =
Expand All @@ -211,13 +197,13 @@ public void testSavepointRestoreSettingsParsing() throws FlinkParseException {
final SavepointRestoreSettings savepointRestoreSettings =
standaloneApplicationClusterConfiguration.getSavepointRestoreSettings();

assertThat(savepointRestoreSettings.restoreSavepoint(), is(true));
assertThat(savepointRestoreSettings.getRestorePath(), is(equalTo(restorePath)));
assertThat(savepointRestoreSettings.allowNonRestoredState(), is(true));
assertThat(savepointRestoreSettings.restoreSavepoint()).isTrue();
assertThat(savepointRestoreSettings.getRestorePath()).isEqualTo(restorePath);
assertThat(savepointRestoreSettings.allowNonRestoredState()).isTrue();
}

@Test
public void testSetJobIdManually() throws FlinkParseException {
void testSetJobIdManually() throws FlinkParseException {
final JobID jobId = new JobID();
final String[] args = {
"--configDir", confDirPath, "--job-classname", "foobar", "--job-id", jobId.toString()
Expand All @@ -226,11 +212,11 @@ public void testSetJobIdManually() throws FlinkParseException {
final StandaloneApplicationClusterConfiguration standaloneApplicationClusterConfiguration =
commandLineParser.parse(args);

assertThat(standaloneApplicationClusterConfiguration.getJobId(), is(equalTo(jobId)));
assertThat(standaloneApplicationClusterConfiguration.getJobId()).isEqualTo(jobId);
}

@Test
public void testInvalidJobIdThrows() {
void testInvalidJobIdThrows() {
final String invalidJobId = "0xINVALID";
final String[] args = {
"--configDir", confDirPath, "--job-classname", "foobar", "--job-id", invalidJobId
Expand All @@ -242,13 +228,13 @@ public void testInvalidJobIdThrows() {
} catch (FlinkParseException e) {
Optional<IllegalArgumentException> cause =
ExceptionUtils.findThrowable(e, IllegalArgumentException.class);
assertTrue(cause.isPresent());
assertThat(cause.get().getMessage(), containsString(invalidJobId));
assertThat(cause).isPresent();
assertThat(cause.get().getMessage()).containsSequence(invalidJobId);
}
}

@Test
public void testShortOptions() throws FlinkParseException {
void testShortOptions() throws FlinkParseException {
final String jobClassName = "foobar";
final JobID jobId = new JobID();
final String savepointRestorePath = "s3:https://foo/bar";
Expand All @@ -264,25 +250,25 @@ public void testShortOptions() throws FlinkParseException {
final StandaloneApplicationClusterConfiguration clusterConfiguration =
commandLineParser.parse(args);

assertThat(clusterConfiguration.getConfigDir(), is(equalTo(confDirPath)));
assertThat(clusterConfiguration.getJobClassName(), is(equalTo(jobClassName)));
assertThat(clusterConfiguration.getJobId(), is(equalTo(jobId)));
assertThat(clusterConfiguration.getConfigDir()).isEqualTo(confDirPath);
assertThat(clusterConfiguration.getJobClassName()).isEqualTo(jobClassName);
assertThat(clusterConfiguration.getJobId()).isEqualTo(jobId);

final SavepointRestoreSettings savepointRestoreSettings =
clusterConfiguration.getSavepointRestoreSettings();
assertThat(savepointRestoreSettings.restoreSavepoint(), is(true));
assertThat(savepointRestoreSettings.getRestorePath(), is(equalTo(savepointRestorePath)));
assertThat(savepointRestoreSettings.allowNonRestoredState(), is(true));
assertThat(savepointRestoreSettings.restoreSavepoint()).isTrue();
assertThat(savepointRestoreSettings.getRestorePath()).isEqualTo(savepointRestorePath);
assertThat(savepointRestoreSettings.allowNonRestoredState()).isTrue();
}

@Test
public void testHostOption() throws FlinkParseException {
void testHostOption() throws FlinkParseException {
final String hostName = "user-specified-hostname";
final String[] args = {
"--configDir", confDirPath, "--job-classname", "foobar", "--host", hostName
};
final StandaloneApplicationClusterConfiguration applicationClusterConfiguration =
commandLineParser.parse(args);
assertThat(applicationClusterConfiguration.getHostname(), is(hostName));
assertThat(applicationClusterConfiguration.getHostname()).isEqualTo(hostName);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http:https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

org.apache.flink.util.TestLoggerExtension
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
import org.apache.flink.table.connector.sink.DynamicTableSink;
import org.apache.flink.table.connector.sink.abilities.SupportsPartitioning;

import org.junit.Test;
import org.junit.jupiter.api.Test;

import java.util.Arrays;
import java.util.HashMap;
Expand All @@ -37,7 +37,7 @@
import static org.assertj.core.api.Assertions.assertThat;

/** Tests for {@link PrintTableSinkFactory}. */
public class PrintSinkFactoryTest {
class PrintSinkFactoryTest {

private static final ResolvedSchema SCHEMA =
ResolvedSchema.of(
Expand All @@ -46,7 +46,7 @@ public class PrintSinkFactoryTest {
Column.physical("f2", DataTypes.BIGINT()));

@Test
public void testPrint() {
void testPrint() {
Map<String, String> properties = new HashMap<>();
properties.put("connector", "print");
properties.put(PrintConnectorOptions.PRINT_IDENTIFIER.key(), "my_print");
Expand Down

0 comments on commit 950cd16

Please sign in to comment.