Skip to content

Commit

Permalink
[FLINK-7737][filesystem] Add syncOnFlush flag to StreamWriterBase
Browse files Browse the repository at this point in the history
It depends whether to call hsync or hflush on the underlying file system
and user preferences. Normally hflush is enough to protect against single
machine HDFS failures and against TaskManagers failures. However if user is
using S3 like file system, or wants to protect againt whole HDFS rack power
loss hsync must be used instead.
  • Loading branch information
pnowojski authored and aljoscha committed Oct 24, 2017
1 parent bc32991 commit 4a6a94d
Show file tree
Hide file tree
Showing 7 changed files with 290 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import java.io.OutputStream;
import java.util.Arrays;
import java.util.Map;
import java.util.Objects;

/**
* Implementation of AvroKeyValue writer that can be used in Sink.
Expand Down Expand Up @@ -86,7 +87,16 @@ public class AvroKeyValueSinkWriter<K, V> extends StreamWriterBase<Tuple2<K, V>>
@SuppressWarnings("deprecation")
public AvroKeyValueSinkWriter(Map<String, String> properties) {
this.properties = properties;
validateProperties();
}

protected AvroKeyValueSinkWriter(AvroKeyValueSinkWriter<K, V> other) {
super(other);
this.properties = other.properties;
validateProperties();
}

private void validateProperties() {
String keySchemaString = properties.get(CONF_OUTPUT_KEY_SCHEMA);
if (keySchemaString == null) {
throw new IllegalStateException("No key schema provided, set '" + CONF_OUTPUT_KEY_SCHEMA + "' property");
Expand Down Expand Up @@ -183,7 +193,7 @@ public void setInputType(TypeInformation<?> type, ExecutionConfig executionConfi

@Override
public Writer<Tuple2<K, V>> duplicate() {
return new AvroKeyValueSinkWriter<K, V>(properties);
return new AvroKeyValueSinkWriter<>(this);
}

// taken from m/r avro lib to remove dependency on it
Expand Down Expand Up @@ -312,4 +322,26 @@ public static Schema getSchema(Schema keySchema, Schema valueSchema) {
return schema;
}
}

@Override
public int hashCode() {
return Objects.hash(super.hashCode(), properties);
}

@Override
public boolean equals(Object other) {
if (this == other) {
return true;
}
if (other == null) {
return false;
}
if (getClass() != other.getClass()) {
return false;
}
AvroKeyValueSinkWriter<K, V> writer = (AvroKeyValueSinkWriter<K, V>) other;
// field comparison
return Objects.equals(properties, writer.properties)
&& super.equals(other);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import org.apache.hadoop.io.compress.CompressionCodecFactory;

import java.io.IOException;
import java.util.Objects;

/**
* A {@link Writer} that writes the bucket files as Hadoop {@link SequenceFile SequenceFiles}.
Expand Down Expand Up @@ -77,6 +78,15 @@ public SequenceFileWriter(String compressionCodecName,
this.compressionType = compressionType;
}

protected SequenceFileWriter(SequenceFileWriter<K, V> other) {
super(other);

this.compressionCodecName = other.compressionCodecName;
this.compressionType = other.compressionType;
this.keyClass = other.keyClass;
this.valueClass = other.valueClass;
}

@Override
public void open(FileSystem fs, Path path) throws IOException {
super.open(fs, path);
Expand Down Expand Up @@ -143,9 +153,31 @@ public void setInputType(TypeInformation<?> type, ExecutionConfig executionConfi

@Override
public Writer<Tuple2<K, V>> duplicate() {
SequenceFileWriter<K, V> result = new SequenceFileWriter<>(compressionCodecName, compressionType);
result.keyClass = keyClass;
result.valueClass = valueClass;
return result;
return new SequenceFileWriter<>(this);
}

@Override
public int hashCode() {
return Objects.hash(super.hashCode(), compressionCodecName, compressionType, keyClass, valueClass);
}

@Override
public boolean equals(Object other) {
if (this == other) {
return true;
}
if (other == null) {
return false;
}
if (getClass() != other.getClass()) {
return false;
}
SequenceFileWriter<K, V> writer = (SequenceFileWriter<K, V>) other;
// field comparison
return Objects.equals(compressionCodecName, writer.compressionCodecName)
&& Objects.equals(compressionType, writer.compressionType)
&& Objects.equals(keyClass, writer.keyClass)
&& Objects.equals(valueClass, writer.valueClass)
&& super.equals(other);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.hadoop.fs.Path;

import java.io.IOException;
import java.util.Objects;

/**
* Base class for {@link Writer Writers} that write to a {@link FSDataOutputStream}.
Expand All @@ -36,6 +37,22 @@ public abstract class StreamWriterBase<T> implements Writer<T> {
*/
private transient FSDataOutputStream outStream;

private boolean syncOnFlush;

public StreamWriterBase() {
}

protected StreamWriterBase(StreamWriterBase<T> other) {
this.syncOnFlush = other.syncOnFlush;
}

/**
* Controls whether to sync {@link FSDataOutputStream} on flush.
*/
public void setSyncOnFlush(boolean syncOnFlush) {
this.syncOnFlush = syncOnFlush;
}

/**
* Returns the current output stream, if the stream is open.
*/
Expand All @@ -59,7 +76,12 @@ public long flush() throws IOException {
if (outStream == null) {
throw new IllegalStateException("Writer is not open");
}
outStream.hflush();
if (syncOnFlush) {
outStream.hsync();
}
else {
outStream.hflush();
}
return outStream.getPos();
}

Expand All @@ -79,4 +101,25 @@ public void close() throws IOException {
outStream = null;
}
}

@Override
public int hashCode() {
return Boolean.hashCode(syncOnFlush);
}

@Override
public boolean equals(Object other) {
if (this == other) {
return true;
}
if (other == null) {
return false;
}
if (getClass() != other.getClass()) {
return false;
}
StreamWriterBase<T> writer = (StreamWriterBase<T>) other;
// field comparison
return Objects.equals(syncOnFlush, writer.syncOnFlush);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import java.nio.charset.Charset;
import java.nio.charset.IllegalCharsetNameException;
import java.nio.charset.UnsupportedCharsetException;
import java.util.Objects;

/**
* A {@link Writer} that uses {@code toString()} on the input elements and writes them to
Expand Down Expand Up @@ -58,6 +59,11 @@ public StringWriter(String charsetName) {
this.charsetName = charsetName;
}

protected StringWriter(StringWriter<T> other) {
super(other);
this.charsetName = other.charsetName;
}

@Override
public void open(FileSystem fs, Path path) throws IOException {
super.open(fs, path);
Expand All @@ -82,6 +88,28 @@ public void write(T element) throws IOException {

@Override
public Writer<T> duplicate() {
return new StringWriter<>();
return new StringWriter<>(this);
}

@Override
public int hashCode() {
return Objects.hash(super.hashCode(), charsetName);
}

@Override
public boolean equals(Object other) {
if (this == other) {
return true;
}
if (other == null) {
return false;
}
if (getClass() != other.getClass()) {
return false;
}
StringWriter<T> writer = (StringWriter<T>) other;
// field comparison
return Objects.equals(charsetName, writer.charsetName)
&& super.equals(other);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http:https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.streaming.connectors.fs;

import org.apache.flink.api.java.tuple.Tuple2;

import org.apache.avro.Schema;
import org.apache.avro.file.DataFileConstants;
import org.junit.Test;

import java.util.HashMap;
import java.util.Map;

import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;

/**
* Tests for {@link AvroKeyValueSinkWriter}.
*/
public class AvroKeyValueSinkWriterTest {

@Test
public void testDuplicate() {
Map<String, String> properties = new HashMap<>();
Schema keySchema = Schema.create(Schema.Type.STRING);
Schema valueSchema = Schema.create(Schema.Type.STRING);
properties.put(AvroKeyValueSinkWriter.CONF_OUTPUT_KEY_SCHEMA, keySchema.toString());
properties.put(AvroKeyValueSinkWriter.CONF_OUTPUT_VALUE_SCHEMA, valueSchema.toString());
properties.put(AvroKeyValueSinkWriter.CONF_COMPRESS, String.valueOf(true));
properties.put(AvroKeyValueSinkWriter.CONF_COMPRESS_CODEC, DataFileConstants.SNAPPY_CODEC);

AvroKeyValueSinkWriter<String, String> writer = new AvroKeyValueSinkWriter(properties);
writer.setSyncOnFlush(true);
Writer<Tuple2<String, String>> other = writer.duplicate();

assertTrue(writer.equals(other));

writer.setSyncOnFlush(false);
assertFalse(writer.equals(other));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http:https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.streaming.connectors.fs;

import org.apache.flink.api.java.tuple.Tuple2;

import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.Text;
import org.junit.Test;

import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;

/**
* Tests for {@link SequenceFileWriter}.
*/
public class SequenceFileWriterTest {

@Test
public void testDuplicate() {
SequenceFileWriter<Text, Text> writer = new SequenceFileWriter("BZ", SequenceFile.CompressionType.BLOCK);
writer.setSyncOnFlush(true);
Writer<Tuple2<Text, Text>> other = writer.duplicate();

assertTrue(writer.equals(other));

writer.setSyncOnFlush(false);
assertFalse(writer.equals(other));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http:https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.streaming.connectors.fs;

import org.junit.Test;

import java.nio.charset.StandardCharsets;

import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;

/**
* Tests for {@link StringWriter}.
*/
public class StringWriterTest {

@Test
public void testDuplicate() {
StringWriter<String> writer = new StringWriter(StandardCharsets.UTF_16.name());
writer.setSyncOnFlush(true);
Writer<String> other = writer.duplicate();

assertTrue(writer.equals(other));

writer.setSyncOnFlush(false);
assertFalse(writer.equals(other));
assertFalse(writer.equals(new StringWriter<>()));
}
}

0 comments on commit 4a6a94d

Please sign in to comment.