Skip to content

Commit

Permalink
[hotfix] Remove various deprecated API usages
Browse files Browse the repository at this point in the history
  • Loading branch information
zentol committed Nov 2, 2022
1 parent 2df4f53 commit 8c28fd3
Show file tree
Hide file tree
Showing 5 changed files with 9 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,9 @@ public static void main(String[] args) throws Exception {
DataStream<Tuple> rows = sEnv.addSource(generator);

DataStream<Tuple> result =
rows.keyBy(1).window(TumblingProcessingTimeWindows.of(Time.seconds(5))).sum(0);
rows.keyBy(tuple -> tuple.getField(1))
.window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
.sum(0);

result.writeAsText(outputPath + "/result.txt", FileSystem.WriteMode.OVERWRITE)
.setParallelism(1);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,7 @@
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;

import static org.hamcrest.Matchers.hasItem;
import static org.junit.Assert.assertThat;
import static org.assertj.core.api.Assertions.assertThat;

/** Test utils for metric reporters. */
public class MetricReporterTestUtils {
Expand All @@ -49,6 +48,6 @@ public static void testMetricReporterSetupViaSPI(
false)
.map(MetricReporterFactory::getClass)
.collect(Collectors.toSet());
assertThat(loadedFactories, hasItem(clazz));
assertThat(loadedFactories).contains(clazz);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
import org.apache.flink.configuration.MetricOptions;
import org.apache.flink.core.testutils.OneShotLatch;
import org.apache.flink.metrics.jmx.JMXReporterFactory;
import org.apache.flink.runtime.checkpoint.CheckpointRetentionPolicy;
import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobGraphBuilder;
Expand Down Expand Up @@ -98,17 +97,7 @@ void testJobManagerJMXMetricAccess(@InjectClusterClient ClusterClient<?> client)

final JobCheckpointingSettings jobCheckpointingSettings =
new JobCheckpointingSettings(
new CheckpointCoordinatorConfiguration(
500,
500,
50,
5,
CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION,
true,
false,
0,
0),
null);
CheckpointCoordinatorConfiguration.builder().build(), null);

final JobGraph jobGraph =
JobGraphBuilder.newStreamingJobGraphBuilder()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ private static void randomizeConfiguration(MiniCluster miniCluster, Configuratio
randomize(conf, ExecutionCheckpointingOptions.ENABLE_UNALIGNED, true, false);
randomize(
conf,
ExecutionCheckpointingOptions.ALIGNMENT_TIMEOUT,
ExecutionCheckpointingOptions.ALIGNED_CHECKPOINT_TIMEOUT,
Duration.ofSeconds(0),
Duration.ofMillis(100),
Duration.ofSeconds(2));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@
import java.io.PipedInputStream;
import java.io.PipedOutputStream;
import java.io.PrintStream;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
Expand Down Expand Up @@ -235,7 +236,7 @@ private static String getYarnClasspath() {
File classPathFile =
TestUtils.findFile(start, (dir, name) -> name.equals("yarn.classpath"));
return FileUtils.readFileToString(
classPathFile); // potential NPE is supposed to be fatal
classPathFile, StandardCharsets.UTF_8); // potential NPE is supposed to be fatal
} catch (Throwable t) {
LOG.error(
"Error while getting YARN classpath in {}",
Expand Down

0 comments on commit 8c28fd3

Please sign in to comment.