forked from apache/flink
-
Notifications
You must be signed in to change notification settings - Fork 0
/
FileOutputFormat.java
429 lines (335 loc) · 12.8 KB
/
FileOutputFormat.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
/***********************************************************************************************************************
* Copyright (C) 2010-2013 by the Stratosphere project (http:https://stratosphere.eu)
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http:https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
**********************************************************************************************************************/
package eu.stratosphere.api.common.io;
import java.io.IOException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import eu.stratosphere.api.common.operators.FileDataSink;
import eu.stratosphere.configuration.ConfigConstants;
import eu.stratosphere.configuration.Configuration;
import eu.stratosphere.configuration.GlobalConfiguration;
import eu.stratosphere.core.fs.FSDataOutputStream;
import eu.stratosphere.core.fs.FileSystem;
import eu.stratosphere.core.fs.FileSystem.WriteMode;
import eu.stratosphere.core.fs.Path;
/**
* The abstract base class for all output formats that are file based. Contains the logic to open/close the target
* file streams.
*/
public abstract class FileOutputFormat<IT> implements OutputFormat<IT> {
private static final long serialVersionUID = 1L;
// --------------------------------------------------------------------------------------------
/**
* Defines the behavior for creating output directories.
*
*/
public static enum OutputDirectoryMode {
/** A directory is always created, regardless of number of write tasks. */
ALWAYS,
/** A directory is only created for parallel output tasks, i.e., number of output tasks > 1.
* If number of output tasks = 1, the output is written to a single file. */
PARONLY
}
// --------------------------------------------------------------------------------------------
private static final WriteMode DEFAULT_WRITE_MODE;
private static final OutputDirectoryMode DEFAULT_OUTPUT_DIRECTORY_MODE;
static {
final boolean overwrite = GlobalConfiguration.getBoolean(ConfigConstants.FILESYSTEM_DEFAULT_OVERWRITE_KEY,
ConfigConstants.DEFAULT_FILESYSTEM_OVERWRITE);
DEFAULT_WRITE_MODE = overwrite ? WriteMode.OVERWRITE : WriteMode.CREATE;
final boolean alwaysCreateDirectory = GlobalConfiguration.getBoolean(ConfigConstants.FILESYSTEM_OUTPUT_ALWAYS_CREATE_DIRECTORY_KEY,
ConfigConstants.DEFAULT_FILESYSTEM_ALWAYS_CREATE_DIRECTORY);
DEFAULT_OUTPUT_DIRECTORY_MODE = alwaysCreateDirectory ? OutputDirectoryMode.ALWAYS : OutputDirectoryMode.PARONLY;
}
// --------------------------------------------------------------------------------------------
/**
* The LOG for logging messages in this class.
*/
private static final Log LOG = LogFactory.getLog(FileOutputFormat.class);
/**
* The key under which the name of the target path is stored in the configuration.
*/
public static final String FILE_PARAMETER_KEY = "stratosphere.output.file";
/**
* The path of the file to be written.
*/
protected Path outputFilePath;
/**
* The write mode of the output.
*/
private WriteMode writeMode;
/**
* The output directory mode
*/
private OutputDirectoryMode outputDirectoryMode;
/**
* Stream opening timeout.
*/
private long openTimeout = -1;
// --------------------------------------------------------------------------------------------
/**
* The stream to which the data is written;
*/
protected transient FSDataOutputStream stream;
// --------------------------------------------------------------------------------------------
@Override
public void configure(Configuration parameters) {
// get the output file path, if it was not yet set
if (this.outputFilePath == null) {
// get the file parameter
String filePath = parameters.getString(FILE_PARAMETER_KEY, null);
if (filePath == null) {
throw new IllegalArgumentException("The output path has been specified neither via constructor/setters" +
", nor via the Configuration.");
}
try {
this.outputFilePath = new Path(filePath);
}
catch (RuntimeException rex) {
throw new RuntimeException("Could not create a valid URI from the given file path name: " + rex.getMessage());
}
}
// check if have not been set and use the defaults in that case
if (this.writeMode == null) {
this.writeMode = DEFAULT_WRITE_MODE;
}
if (this.outputDirectoryMode == null) {
this.outputDirectoryMode = DEFAULT_OUTPUT_DIRECTORY_MODE;
}
if (this.openTimeout == -1) {
this.openTimeout = FileInputFormat.DEFAULT_OPENING_TIMEOUT;
}
}
@Override
public void open(int taskNumber, int numTasks) throws IOException {
if (LOG.isDebugEnabled())
LOG.debug("Openint stream for output (" + (taskNumber+1) + "/" + numTasks + "). WriteMode=" + writeMode +
", OutputDirectoryMode=" + outputDirectoryMode + ", timeout=" + openTimeout);
// obtain FSDataOutputStream asynchronously, since HDFS client is vulnerable to InterruptedExceptions
OutputPathOpenThread opot = new OutputPathOpenThread(this, (taskNumber + 1), numTasks);
opot.start();
try {
// get FSDataOutputStream
this.stream = opot.waitForCompletion();
}
catch (Exception e) {
throw new RuntimeException("Stream to output file could not be opened: " + e.getMessage(), e);
}
}
@Override
public void close() throws IOException {
final FSDataOutputStream s = this.stream;
if (s != null) {
this.stream = null;
s.close();
}
}
public void setOutputFilePath(Path path) {
if (path == null)
throw new IllegalArgumentException("Output file path may not be null.");
this.outputFilePath = path;
}
public Path getOutputFilePath() {
return this.outputFilePath;
}
public void setWriteMode(WriteMode mode) {
if (mode == null) {
throw new NullPointerException();
}
this.writeMode = mode;
}
public WriteMode getWriteMode() {
return this.writeMode;
}
public void setOutputDirectoryMode(OutputDirectoryMode mode) {
if (mode == null) {
throw new NullPointerException();
}
this.outputDirectoryMode = mode;
}
public OutputDirectoryMode getOutputDirectoryMode() {
return this.outputDirectoryMode;
}
public void setOpenTimeout(long timeout) {
if (timeout < 0) {
throw new IllegalArgumentException("The timeout must be a nonnegative numer of milliseconds (zero for infinite).");
}
this.openTimeout = (timeout == 0) ? Long.MAX_VALUE : timeout;
}
public long getOpenTimeout() {
return this.openTimeout;
}
// ============================================================================================
private static final class OutputPathOpenThread extends Thread {
private final Path path;
private final int taskIndex;
private final int numTasks;
private final WriteMode writeMode;
private final OutputDirectoryMode outDirMode;
private final long timeoutMillies;
private volatile FSDataOutputStream fdos;
private volatile Throwable error;
private volatile boolean aborted;
public OutputPathOpenThread(FileOutputFormat<?> fof, int taskIndex, int numTasks) {
this.path = fof.getOutputFilePath();
this.writeMode = fof.getWriteMode();
this.outDirMode = fof.getOutputDirectoryMode();
this.timeoutMillies = fof.getOpenTimeout();
this.taskIndex = taskIndex;
this.numTasks = numTasks;
}
@Override
public void run() {
try {
Path p = this.path;
final FileSystem fs = p.getFileSystem();
// initialize output path.
if(this.numTasks == 1 && outDirMode == OutputDirectoryMode.PARONLY) {
// output is not written in parallel and should go to a single file
if(!fs.isDistributedFS()) {
// prepare local output path
// checks for write mode and removes existing files in case of OVERWRITE mode
if(!fs.initOutPathLocalFS(p, writeMode, false)) {
// output preparation failed! Cancel task.
throw new IOException("Output path could not be initialized. Canceling task.");
}
}
} else if(this.numTasks > 1 || outDirMode == OutputDirectoryMode.ALWAYS) {
// output is written in parallel into a directory or should always be written to a directory
if(!fs.isDistributedFS()) {
// File system is not distributed.
// We need to prepare the output path on each executing node.
if(!fs.initOutPathLocalFS(p, writeMode, true)) {
// output preparation failed! Cancel task.
throw new IOException("Output directory could not be created. Canceling task.");
}
}
// Suffix the path with the parallel instance index
p = p.suffix("/" + this.taskIndex);
} else {
// invalid number of subtasks (<= 0)
throw new IllegalArgumentException("Invalid number of subtasks. Canceling task.");
}
// create output file
switch(writeMode) {
case CREATE:
this.fdos = fs.create(p, false);
break;
case OVERWRITE:
this.fdos = fs.create(p, true);
break;
default:
throw new IllegalArgumentException("Invalid write mode: "+writeMode);
}
// check for canceling and close the stream in that case, because no one will obtain it
if (this.aborted) {
final FSDataOutputStream f = this.fdos;
this.fdos = null;
f.close();
}
}
catch (Throwable t) {
this.error = t;
}
}
public FSDataOutputStream waitForCompletion() throws Exception {
final long start = System.currentTimeMillis();
long remaining = this.timeoutMillies;
do {
try {
this.join(remaining);
} catch (InterruptedException iex) {
// we were canceled, so abort the procedure
abortWait();
throw iex;
}
}
while (this.error == null && this.fdos == null &&
(remaining = this.timeoutMillies + start - System.currentTimeMillis()) > 0);
if (this.error != null) {
throw new IOException("Opening the file output stream failed" +
(this.error.getMessage() == null ? "." : ": " + this.error.getMessage()), this.error);
}
if (this.fdos != null) {
return this.fdos;
} else {
// double-check that the stream has not been set by now. we don't know here whether
// a) the opener thread recognized the canceling and closed the stream
// b) the flag was set such that the stream did not see it and we have a valid stream
// In any case, close the stream and throw an exception.
abortWait();
final boolean stillAlive = this.isAlive();
final StringBuilder bld = new StringBuilder(256);
for (StackTraceElement e : this.getStackTrace()) {
bld.append("\tat ").append(e.toString()).append('\n');
}
throw new IOException("Output opening request timed out. Opener was " + (stillAlive ? "" : "NOT ") +
" alive. Stack:\n" + bld.toString());
}
}
/**
* Double checked procedure setting the abort flag and closing the stream.
*/
private final void abortWait() {
this.aborted = true;
final FSDataOutputStream outStream = this.fdos;
this.fdos = null;
if (outStream != null) {
try {
outStream.close();
} catch (Throwable t) {}
}
}
}
// ============================================================================================
/**
* Creates a configuration builder that can be used to set the input format's parameters to the config in a fluent
* fashion.
*
* @return A config builder for setting parameters.
*/
public static ConfigBuilder configureFileFormat(FileDataSink target) {
return new ConfigBuilder(target.getParameters());
}
/**
* A builder used to set parameters to the output format's configuration in a fluent way.
*/
public static abstract class AbstractConfigBuilder<T> {
/**
* The configuration into which the parameters will be written.
*/
protected final Configuration config;
// --------------------------------------------------------------------
/**
* Creates a new builder for the given configuration.
*
* @param targetConfig The configuration into which the parameters will be written.
*/
protected AbstractConfigBuilder(Configuration targetConfig) {
this.config = targetConfig;
}
}
/**
* A builder used to set parameters to the input format's configuration in a fluent way.
*/
public static class ConfigBuilder extends AbstractConfigBuilder<ConfigBuilder> {
/**
* Creates a new builder for the given configuration.
*
* @param targetConfig The configuration into which the parameters will be written.
*/
protected ConfigBuilder(Configuration targetConfig) {
super(targetConfig);
}
}
}