Skip to content

Commit

Permalink
[FLINK-8543] Don't call super.close() in AvroKeyValueSinkWriter
Browse files Browse the repository at this point in the history
The call to keyValueWriter.close() in AvroKeyValueSinkWriter.close()
will eventually call flush() on the wrapped stream which fails if we
close it before(). Now we call flush ourselves before closing the
KeyValyeWriter, which internally closes the wrapped stream eventually.
  • Loading branch information
aljoscha committed Feb 26, 2018
1 parent a2d1d08 commit 915213c
Showing 1 changed file with 17 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -150,17 +150,29 @@ private CodecFactory getCompressionCodec(Map<String, String> conf) {
public void open(FileSystem fs, Path path) throws IOException {
super.open(fs, path);

CodecFactory compressionCodec = getCompressionCodec(properties);
Schema keySchema = Schema.parse(properties.get(CONF_OUTPUT_KEY_SCHEMA));
Schema valueSchema = Schema.parse(properties.get(CONF_OUTPUT_VALUE_SCHEMA));
keyValueWriter = new AvroKeyValueWriter<K, V>(keySchema, valueSchema, compressionCodec, getStream());
try {
CodecFactory compressionCodec = getCompressionCodec(properties);
Schema keySchema = Schema.parse(properties.get(CONF_OUTPUT_KEY_SCHEMA));
Schema valueSchema = Schema.parse(properties.get(CONF_OUTPUT_VALUE_SCHEMA));
keyValueWriter = new AvroKeyValueWriter<K, V>(
keySchema,
valueSchema,
compressionCodec,
getStream());
} finally {
if (keyValueWriter == null) {
close();
}
}
}

@Override
public void close() throws IOException {
super.close(); //the order is important since super.close flushes inside
if (keyValueWriter != null) {
keyValueWriter.close();
} else {
// need to make sure we close this if we never created the Key/Value Writer.
super.close();
}
}

Expand Down

0 comments on commit 915213c

Please sign in to comment.