forked from apache/flink
-
Notifications
You must be signed in to change notification settings - Fork 0
/
LocalFileSystem.java
330 lines (278 loc) · 9.67 KB
/
LocalFileSystem.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http:https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
/*
* Parts of earlier versions of this file were based on source code from the
* Hadoop Project (http:https://hadoop.apache.org/), licensed by the Apache Software Foundation (ASF)
* under the Apache License, Version 2.0. See the NOTICE file distributed with this work for
* additional information regarding copyright ownership.
*/
package org.apache.flink.core.fs.local;
import org.apache.flink.annotation.Internal;
import org.apache.flink.core.fs.BlockLocation;
import org.apache.flink.core.fs.FSDataInputStream;
import org.apache.flink.core.fs.FSDataOutputStream;
import org.apache.flink.core.fs.FileStatus;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.core.fs.FileSystemKind;
import org.apache.flink.core.fs.Path;
import org.apache.flink.util.OperatingSystem;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.net.InetAddress;
import java.net.URI;
import java.net.UnknownHostException;
import java.nio.file.AccessDeniedException;
import java.nio.file.DirectoryNotEmptyException;
import java.nio.file.FileAlreadyExistsException;
import java.nio.file.Files;
import java.nio.file.NoSuchFileException;
import java.nio.file.StandardCopyOption;
import static org.apache.flink.util.Preconditions.checkNotNull;
/**
* The class {@code LocalFileSystem} is an implementation of the {@link FileSystem} interface
* for the local file system of the machine where the JVM runs.
*/
@Internal
public class LocalFileSystem extends FileSystem {
private static final Logger LOG = LoggerFactory.getLogger(LocalFileSystem.class);
/** The URI representing the local file system. */
private static final URI LOCAL_URI = OperatingSystem.isWindows() ? URI.create("file:/") : URI.create("file:https:///");
/** The shared instance of the local file system. */
private static final LocalFileSystem INSTANCE = new LocalFileSystem();
/** Path pointing to the current working directory.
* Because Paths are not immutable, we cannot cache the proper path here */
private final URI workingDir;
/** Path pointing to the current working directory.
* Because Paths are not immutable, we cannot cache the proper path here. */
private final URI homeDir;
/** The host name of this machine. */
private final String hostName;
/**
* Constructs a new <code>LocalFileSystem</code> object.
*/
public LocalFileSystem() {
this.workingDir = new File(System.getProperty("user.dir")).toURI();
this.homeDir = new File(System.getProperty("user.home")).toURI();
String tmp = "unknownHost";
try {
tmp = InetAddress.getLocalHost().getHostName();
} catch (UnknownHostException e) {
LOG.error("Could not resolve local host", e);
}
this.hostName = tmp;
}
// ------------------------------------------------------------------------
@Override
public BlockLocation[] getFileBlockLocations(FileStatus file, long start, long len) throws IOException {
return new BlockLocation[] {
new LocalBlockLocation(hostName, file.getLen())
};
}
@Override
public FileStatus getFileStatus(Path f) throws IOException {
final File path = pathToFile(f);
if (path.exists()) {
return new LocalFileStatus(path, this);
}
else {
throw new FileNotFoundException("File " + f + " does not exist or the user running "
+ "Flink ('" + System.getProperty("user.name") + "') has insufficient permissions to access it.");
}
}
@Override
public URI getUri() {
return LOCAL_URI;
}
@Override
public Path getWorkingDirectory() {
return new Path(workingDir);
}
@Override
public Path getHomeDirectory() {
return new Path(homeDir);
}
@Override
public FSDataInputStream open(final Path f, final int bufferSize) throws IOException {
return open(f);
}
@Override
public FSDataInputStream open(final Path f) throws IOException {
final File file = pathToFile(f);
return new LocalDataInputStream(file);
}
private File pathToFile(Path path) {
if (!path.isAbsolute()) {
path = new Path(getWorkingDirectory(), path);
}
return new File(path.toUri().getPath());
}
@Override
public boolean exists(Path f) throws IOException {
final File path = pathToFile(f);
return path.exists();
}
@Override
public FileStatus[] listStatus(final Path f) throws IOException {
final File localf = pathToFile(f);
FileStatus[] results;
if (!localf.exists()) {
return null;
}
if (localf.isFile()) {
return new FileStatus[] { new LocalFileStatus(localf, this) };
}
final String[] names = localf.list();
if (names == null) {
return null;
}
results = new FileStatus[names.length];
for (int i = 0; i < names.length; i++) {
results[i] = getFileStatus(new Path(f, names[i]));
}
return results;
}
@Override
public boolean delete(final Path f, final boolean recursive) throws IOException {
final File file = pathToFile(f);
if (file.isFile()) {
return file.delete();
} else if ((!recursive) && file.isDirectory()) {
File[] containedFiles = file.listFiles();
if (containedFiles == null) {
throw new IOException("Directory " + file.toString() + " does not exist or an I/O error occurred");
} else if (containedFiles.length != 0) {
throw new IOException("Directory " + file.toString() + " is not empty");
}
}
return delete(file);
}
/**
* Deletes the given file or directory.
*
* @param f
* the file to be deleted
* @return <code>true</code> if all files were deleted successfully, <code>false</code> otherwise
* @throws IOException
* thrown if an error occurred while deleting the files/directories
*/
private boolean delete(final File f) throws IOException {
if (f.isDirectory()) {
final File[] files = f.listFiles();
if (files != null) {
for (File file : files) {
final boolean del = delete(file);
if (!del) {
return false;
}
}
}
} else {
return f.delete();
}
// Now directory is empty
return f.delete();
}
/**
* Recursively creates the directory specified by the provided path.
*
* @return <code>true</code>if the directories either already existed or have been created successfully,
* <code>false</code> otherwise
* @throws IOException
* thrown if an error occurred while creating the directory/directories
*/
@Override
public boolean mkdirs(final Path f) throws IOException {
checkNotNull(f, "path is null");
return mkdirsInternal(pathToFile(f));
}
private boolean mkdirsInternal(File file) throws IOException {
if (file.isDirectory()) {
return true;
}
else if (file.exists() && !file.isDirectory()) {
// Important: The 'exists()' check above must come before the 'isDirectory()' check to
// be safe when multiple parallel instances try to create the directory
// exists and is not a directory -> is a regular file
throw new FileAlreadyExistsException(file.getAbsolutePath());
}
else {
File parent = file.getParentFile();
return (parent == null || mkdirsInternal(parent)) && file.mkdir();
}
}
@Override
public FSDataOutputStream create(final Path filePath, final WriteMode overwrite) throws IOException {
checkNotNull(filePath, "filePath");
if (exists(filePath) && overwrite == WriteMode.NO_OVERWRITE) {
throw new FileAlreadyExistsException("File already exists: " + filePath);
}
final Path parent = filePath.getParent();
if (parent != null && !mkdirs(parent)) {
throw new IOException("Mkdirs failed to create " + parent);
}
final File file = pathToFile(filePath);
return new LocalDataOutputStream(file);
}
@Override
public boolean rename(final Path src, final Path dst) throws IOException {
final File srcFile = pathToFile(src);
final File dstFile = pathToFile(dst);
final File dstParent = dstFile.getParentFile();
// Files.move fails if the destination directory doesn't exist
//noinspection ResultOfMethodCallIgnored -- we don't care if the directory existed or was created
dstParent.mkdirs();
try {
Files.move(srcFile.toPath(), dstFile.toPath(), StandardCopyOption.REPLACE_EXISTING);
return true;
}
catch (NoSuchFileException | AccessDeniedException | DirectoryNotEmptyException | SecurityException ex) {
// catch the errors that are regular "move failed" exceptions and return false
return false;
}
}
@Override
public boolean isDistributedFS() {
return false;
}
@Override
public FileSystemKind getKind() {
return FileSystemKind.FILE_SYSTEM;
}
// ------------------------------------------------------------------------
/**
* Gets the URI that represents the local file system.
* That URI is {@code "file:/"} on Windows platforms and {@code "file:https:///"} on other
* UNIX family platforms.
*
* @return The URI that represents the local file system.
*/
public static URI getLocalFsURI() {
return LOCAL_URI;
}
/**
* Gets the shared instance of this file system.
*
* @return The shared instance of this file system.
*/
public static LocalFileSystem getSharedInstance() {
return INSTANCE;
}
}