Skip to content

Commit

Permalink
[FLINK-18343][e2e] Refactor file-line replacement into separate method
Browse files Browse the repository at this point in the history
  • Loading branch information
zentol committed Jun 19, 2020
1 parent c8e9d0d commit febe8e3
Show file tree
Hide file tree
Showing 3 changed files with 145 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.apache.flink.tests.util.TestUtils;
import org.apache.flink.tests.util.activation.OperatingSystemRestriction;
import org.apache.flink.tests.util.cache.DownloadCache;
import org.apache.flink.tests.util.util.FileUtils;
import org.apache.flink.util.OperatingSystem;

import org.junit.rules.TemporaryFolder;
Expand All @@ -33,15 +34,12 @@
import javax.annotation.Nullable;

import java.io.IOException;
import java.io.OutputStreamWriter;
import java.io.PrintStream;
import java.io.PrintWriter;
import java.net.InetSocketAddress;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.nio.file.StandardOpenOption;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
Expand Down Expand Up @@ -115,22 +113,18 @@ private void setupKafkaDist() throws IOException {
.build());

LOG.info("Updating ZooKeeper properties");
final Path zookeeperPropertiesFile = kafkaDir.resolve(Paths.get("config", "zookeeper.properties"));
final List<String> zookeeperPropertiesFileLines = Files.readAllLines(zookeeperPropertiesFile);
try (PrintWriter pw = new PrintWriter(new OutputStreamWriter(Files.newOutputStream(zookeeperPropertiesFile, StandardOpenOption.TRUNCATE_EXISTING), StandardCharsets.UTF_8.name()))) {
zookeeperPropertiesFileLines.stream()
.map(line -> ZK_DATA_DIR_PATTERN.matcher(line).replaceAll("$1" + kafkaDir.resolve("zookeeper").toAbsolutePath()))
.forEachOrdered(pw::println);
}
FileUtils.replace(
kafkaDir.resolve(Paths.get("config", "zookeeper.properties")),
ZK_DATA_DIR_PATTERN,
matcher -> matcher.replaceAll("$1" + kafkaDir.resolve("zookeeper").toAbsolutePath())
);

LOG.info("Updating Kafka properties");
final Path kafkaPropertiesFile = kafkaDir.resolve(Paths.get("config", "server.properties"));
final List<String> kafkaPropertiesFileLines = Files.readAllLines(kafkaPropertiesFile);
try (PrintWriter pw = new PrintWriter(new OutputStreamWriter(Files.newOutputStream(kafkaPropertiesFile, StandardOpenOption.TRUNCATE_EXISTING), StandardCharsets.UTF_8.name()))) {
kafkaPropertiesFileLines.stream()
.map(line -> KAFKA_LOG_DIR_PATTERN.matcher(line).replaceAll("$1" + kafkaDir.resolve("kafka").toAbsolutePath()))
.forEachOrdered(pw::println);
}
FileUtils.replace(
kafkaDir.resolve(Paths.get("config", "server.properties")),
KAFKA_LOG_DIR_PATTERN,
matcher -> matcher.replaceAll("$1" + kafkaDir.resolve("kafka").toAbsolutePath())
);
}

private void setupKafkaCluster() throws IOException {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http:https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.tests.util.util;

import java.io.IOException;
import java.io.OutputStreamWriter;
import java.io.PrintWriter;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.StandardOpenOption;
import java.util.List;
import java.util.function.Function;
import java.util.regex.Matcher;
import java.util.regex.Pattern;

/**
* Collection of file-related utilities.
*/
public class FileUtils {

/**
* Matches the given {@link Pattern} against all lines in the given file, and replaces all matches with the replacement
* generated by the given {@link Function}.
* All unmatched lines and provided replacements are written into the file, with the order corresponding to the
* original content. Newlines are automatically added to each line; this implies that an empty replacement string
* will result in an empty line to be written.
*/
public static void replace(Path file, Pattern pattern, Function<Matcher, String> replacer) throws IOException {
final List<String> fileLines = Files.readAllLines(file);
try (PrintWriter pw = new PrintWriter(new OutputStreamWriter(Files.newOutputStream(file, StandardOpenOption.TRUNCATE_EXISTING), StandardCharsets.UTF_8.name()))) {
for (String fileLine : fileLines) {
Matcher matcher = pattern.matcher(fileLine);
if (matcher.matches()) {
String replacement = replacer.apply(matcher);
pw.println(replacement);
} else {
pw.println(fileLine);
}
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http:https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.tests.util.util;

import org.apache.flink.util.TestLogger;

import org.junit.Assert;
import org.junit.Before;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;

import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.regex.Pattern;

/**
* Tests for {@link FileUtils}.
*/
public class FileUtilsTest extends TestLogger {

@ClassRule
public static final TemporaryFolder TMP = new TemporaryFolder();

private static final List<String> ORIGINAL_LINES = Collections.unmodifiableList(Arrays.asList("line1", "line2", "line3"));
private Path testFile;

@Before
public void setupFile() throws IOException {
Path path = TMP.newFile().toPath();

Files.write(path, ORIGINAL_LINES);

testFile = path;
}

@Test
public void replaceSingleMatch() throws IOException {
FileUtils.replace(testFile, Pattern.compile("line1"), matcher -> "removed");

Assert.assertEquals(Arrays.asList("removed", ORIGINAL_LINES.get(1), ORIGINAL_LINES.get(2)), Files.readAllLines(testFile));
}

@Test
public void replaceMultipleMatch() throws IOException {
FileUtils.replace(testFile, Pattern.compile("line(.*)"), matcher -> matcher.group(1));

Assert.assertEquals(Arrays.asList("1", "2", "3"), Files.readAllLines(testFile));
}

@Test
public void replaceWithEmptyLine() throws IOException {
FileUtils.replace(testFile, Pattern.compile("line2"), matcher -> "");

Assert.assertEquals(Arrays.asList(ORIGINAL_LINES.get(0), "", ORIGINAL_LINES.get(2)), Files.readAllLines(testFile));
}
}

0 comments on commit febe8e3

Please sign in to comment.