Skip to content

Commit

Permalink
[FLINK-23429][state-processor-api] Use Path instead of Path.getPath()…
Browse files Browse the repository at this point in the history
… to preserve FileSystem info

This closes apache#16542
  • Loading branch information
qinjunjerry authored and sjwiesman committed Jul 21, 2021
1 parent 7083bb7 commit f9286a5
Show file tree
Hide file tree
Showing 2 changed files with 8 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@

/** This output format copies files from an existing savepoint into a new directory. */
@Internal
public final class FileCopyFunction implements OutputFormat<String> {
public final class FileCopyFunction implements OutputFormat<Path> {

private static final long serialVersionUID = 1L;

Expand All @@ -52,8 +52,7 @@ public void open(int taskNumber, int numTasks) throws IOException {
}

@Override
public void writeRecord(String record) throws IOException {
Path sourcePath = new Path(record);
public void writeRecord(Path sourcePath) throws IOException {
Path destPath = new Path(path, sourcePath.getName());
try (FSDataOutputStream os =
destPath.getFileSystem()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,18 +34,18 @@

/** Extracts all file paths that are part of the provided {@link OperatorState}. */
@Internal
public class StatePathExtractor implements FlatMapFunction<OperatorState, String> {
public class StatePathExtractor implements FlatMapFunction<OperatorState, Path> {

private static final long serialVersionUID = 1L;

@Override
public void flatMap(OperatorState operatorState, Collector<String> out) throws Exception {
public void flatMap(OperatorState operatorState, Collector<Path> out) throws Exception {
for (OperatorSubtaskState subTaskState : operatorState.getSubtaskStates().values()) {
// managed operator state
for (OperatorStateHandle operatorStateHandle : subTaskState.getManagedOperatorState()) {
Path path = getStateFilePathFromStreamStateHandle(operatorStateHandle);
if (path != null) {
out.collect(path.getPath());
out.collect(path);
}
}
// managed keyed state
Expand All @@ -55,15 +55,15 @@ public void flatMap(OperatorState operatorState, Collector<String> out) throws E
getStateFilePathFromStreamStateHandle(
(KeyGroupsStateHandle) keyedStateHandle);
if (path != null) {
out.collect(path.getPath());
out.collect(path);
}
}
}
// raw operator state
for (OperatorStateHandle operatorStateHandle : subTaskState.getRawOperatorState()) {
Path path = getStateFilePathFromStreamStateHandle(operatorStateHandle);
if (path != null) {
out.collect(path.getPath());
out.collect(path);
}
}
// raw keyed state
Expand All @@ -73,7 +73,7 @@ public void flatMap(OperatorState operatorState, Collector<String> out) throws E
getStateFilePathFromStreamStateHandle(
(KeyGroupsStateHandle) keyedStateHandle);
if (path != null) {
out.collect(path.getPath());
out.collect(path);
}
}
}
Expand Down

0 comments on commit f9286a5

Please sign in to comment.