diff --git a/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/AvroKeyValueSinkWriter.java b/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/AvroKeyValueSinkWriter.java index 45e73feafb31b..317ee55242862 100644 --- a/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/AvroKeyValueSinkWriter.java +++ b/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/AvroKeyValueSinkWriter.java @@ -40,6 +40,7 @@ import java.io.OutputStream; import java.util.Arrays; import java.util.Map; +import java.util.Objects; /** * Implementation of AvroKeyValue writer that can be used in Sink. @@ -86,7 +87,16 @@ public class AvroKeyValueSinkWriter extends StreamWriterBase> @SuppressWarnings("deprecation") public AvroKeyValueSinkWriter(Map properties) { this.properties = properties; + validateProperties(); + } + + protected AvroKeyValueSinkWriter(AvroKeyValueSinkWriter other) { + super(other); + this.properties = other.properties; + validateProperties(); + } + private void validateProperties() { String keySchemaString = properties.get(CONF_OUTPUT_KEY_SCHEMA); if (keySchemaString == null) { throw new IllegalStateException("No key schema provided, set '" + CONF_OUTPUT_KEY_SCHEMA + "' property"); @@ -183,7 +193,7 @@ public void setInputType(TypeInformation type, ExecutionConfig executionConfi @Override public Writer> duplicate() { - return new AvroKeyValueSinkWriter(properties); + return new AvroKeyValueSinkWriter<>(this); } // taken from m/r avro lib to remove dependency on it @@ -312,4 +322,26 @@ public static Schema getSchema(Schema keySchema, Schema valueSchema) { return schema; } } + + @Override + public int hashCode() { + return Objects.hash(super.hashCode(), properties); + } + + @Override + public boolean equals(Object other) { + if (this == other) { + return true; + } + if (other == null) { + return false; + } + if (getClass() != other.getClass()) { + return false; + } + AvroKeyValueSinkWriter writer = (AvroKeyValueSinkWriter) other; + // field comparison + return Objects.equals(properties, writer.properties) + && super.equals(other); + } } diff --git a/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/SequenceFileWriter.java b/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/SequenceFileWriter.java index 6f805448aae78..2f42ef74c5211 100644 --- a/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/SequenceFileWriter.java +++ b/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/SequenceFileWriter.java @@ -34,6 +34,7 @@ import org.apache.hadoop.io.compress.CompressionCodecFactory; import java.io.IOException; +import java.util.Objects; /** * A {@link Writer} that writes the bucket files as Hadoop {@link SequenceFile SequenceFiles}. @@ -77,6 +78,15 @@ public SequenceFileWriter(String compressionCodecName, this.compressionType = compressionType; } + protected SequenceFileWriter(SequenceFileWriter other) { + super(other); + + this.compressionCodecName = other.compressionCodecName; + this.compressionType = other.compressionType; + this.keyClass = other.keyClass; + this.valueClass = other.valueClass; + } + @Override public void open(FileSystem fs, Path path) throws IOException { super.open(fs, path); @@ -143,9 +153,31 @@ public void setInputType(TypeInformation type, ExecutionConfig executionConfi @Override public Writer> duplicate() { - SequenceFileWriter result = new SequenceFileWriter<>(compressionCodecName, compressionType); - result.keyClass = keyClass; - result.valueClass = valueClass; - return result; + return new SequenceFileWriter<>(this); + } + + @Override + public int hashCode() { + return Objects.hash(super.hashCode(), compressionCodecName, compressionType, keyClass, valueClass); + } + + @Override + public boolean equals(Object other) { + if (this == other) { + return true; + } + if (other == null) { + return false; + } + if (getClass() != other.getClass()) { + return false; + } + SequenceFileWriter writer = (SequenceFileWriter) other; + // field comparison + return Objects.equals(compressionCodecName, writer.compressionCodecName) + && Objects.equals(compressionType, writer.compressionType) + && Objects.equals(keyClass, writer.keyClass) + && Objects.equals(valueClass, writer.valueClass) + && super.equals(other); } } diff --git a/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/StreamWriterBase.java b/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/StreamWriterBase.java index 0a3a60362f166..f625ef38905c7 100644 --- a/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/StreamWriterBase.java +++ b/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/StreamWriterBase.java @@ -23,6 +23,7 @@ import org.apache.hadoop.fs.Path; import java.io.IOException; +import java.util.Objects; /** * Base class for {@link Writer Writers} that write to a {@link FSDataOutputStream}. @@ -36,6 +37,22 @@ public abstract class StreamWriterBase implements Writer { */ private transient FSDataOutputStream outStream; + private boolean syncOnFlush; + + public StreamWriterBase() { + } + + protected StreamWriterBase(StreamWriterBase other) { + this.syncOnFlush = other.syncOnFlush; + } + + /** + * Controls whether to sync {@link FSDataOutputStream} on flush. + */ + public void setSyncOnFlush(boolean syncOnFlush) { + this.syncOnFlush = syncOnFlush; + } + /** * Returns the current output stream, if the stream is open. */ @@ -59,7 +76,12 @@ public long flush() throws IOException { if (outStream == null) { throw new IllegalStateException("Writer is not open"); } - outStream.hflush(); + if (syncOnFlush) { + outStream.hsync(); + } + else { + outStream.hflush(); + } return outStream.getPos(); } @@ -79,4 +101,25 @@ public void close() throws IOException { outStream = null; } } + + @Override + public int hashCode() { + return Boolean.hashCode(syncOnFlush); + } + + @Override + public boolean equals(Object other) { + if (this == other) { + return true; + } + if (other == null) { + return false; + } + if (getClass() != other.getClass()) { + return false; + } + StreamWriterBase writer = (StreamWriterBase) other; + // field comparison + return Objects.equals(syncOnFlush, writer.syncOnFlush); + } } diff --git a/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/StringWriter.java b/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/StringWriter.java index d2ef9d61605dd..5c81b15bf1a0e 100644 --- a/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/StringWriter.java +++ b/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/StringWriter.java @@ -26,6 +26,7 @@ import java.nio.charset.Charset; import java.nio.charset.IllegalCharsetNameException; import java.nio.charset.UnsupportedCharsetException; +import java.util.Objects; /** * A {@link Writer} that uses {@code toString()} on the input elements and writes them to @@ -58,6 +59,11 @@ public StringWriter(String charsetName) { this.charsetName = charsetName; } + protected StringWriter(StringWriter other) { + super(other); + this.charsetName = other.charsetName; + } + @Override public void open(FileSystem fs, Path path) throws IOException { super.open(fs, path); @@ -82,6 +88,28 @@ public void write(T element) throws IOException { @Override public Writer duplicate() { - return new StringWriter<>(); + return new StringWriter<>(this); + } + + @Override + public int hashCode() { + return Objects.hash(super.hashCode(), charsetName); + } + + @Override + public boolean equals(Object other) { + if (this == other) { + return true; + } + if (other == null) { + return false; + } + if (getClass() != other.getClass()) { + return false; + } + StringWriter writer = (StringWriter) other; + // field comparison + return Objects.equals(charsetName, writer.charsetName) + && super.equals(other); } } diff --git a/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/AvroKeyValueSinkWriterTest.java b/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/AvroKeyValueSinkWriterTest.java new file mode 100644 index 0000000000000..019e56d66bae4 --- /dev/null +++ b/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/AvroKeyValueSinkWriterTest.java @@ -0,0 +1,57 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.fs; + +import org.apache.flink.api.java.tuple.Tuple2; + +import org.apache.avro.Schema; +import org.apache.avro.file.DataFileConstants; +import org.junit.Test; + +import java.util.HashMap; +import java.util.Map; + +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +/** + * Tests for {@link AvroKeyValueSinkWriter}. + */ +public class AvroKeyValueSinkWriterTest { + + @Test + public void testDuplicate() { + Map properties = new HashMap<>(); + Schema keySchema = Schema.create(Schema.Type.STRING); + Schema valueSchema = Schema.create(Schema.Type.STRING); + properties.put(AvroKeyValueSinkWriter.CONF_OUTPUT_KEY_SCHEMA, keySchema.toString()); + properties.put(AvroKeyValueSinkWriter.CONF_OUTPUT_VALUE_SCHEMA, valueSchema.toString()); + properties.put(AvroKeyValueSinkWriter.CONF_COMPRESS, String.valueOf(true)); + properties.put(AvroKeyValueSinkWriter.CONF_COMPRESS_CODEC, DataFileConstants.SNAPPY_CODEC); + + AvroKeyValueSinkWriter writer = new AvroKeyValueSinkWriter(properties); + writer.setSyncOnFlush(true); + Writer> other = writer.duplicate(); + + assertTrue(writer.equals(other)); + + writer.setSyncOnFlush(false); + assertFalse(writer.equals(other)); + } +} diff --git a/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/SequenceFileWriterTest.java b/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/SequenceFileWriterTest.java new file mode 100644 index 0000000000000..7ea22649d4970 --- /dev/null +++ b/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/SequenceFileWriterTest.java @@ -0,0 +1,46 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.fs; + +import org.apache.flink.api.java.tuple.Tuple2; + +import org.apache.hadoop.io.SequenceFile; +import org.apache.hadoop.io.Text; +import org.junit.Test; + +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +/** + * Tests for {@link SequenceFileWriter}. + */ +public class SequenceFileWriterTest { + + @Test + public void testDuplicate() { + SequenceFileWriter writer = new SequenceFileWriter("BZ", SequenceFile.CompressionType.BLOCK); + writer.setSyncOnFlush(true); + Writer> other = writer.duplicate(); + + assertTrue(writer.equals(other)); + + writer.setSyncOnFlush(false); + assertFalse(writer.equals(other)); + } +} diff --git a/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/StringWriterTest.java b/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/StringWriterTest.java new file mode 100644 index 0000000000000..488f860c32773 --- /dev/null +++ b/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/StringWriterTest.java @@ -0,0 +1,45 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.fs; + +import org.junit.Test; + +import java.nio.charset.StandardCharsets; + +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +/** + * Tests for {@link StringWriter}. + */ +public class StringWriterTest { + + @Test + public void testDuplicate() { + StringWriter writer = new StringWriter(StandardCharsets.UTF_16.name()); + writer.setSyncOnFlush(true); + Writer other = writer.duplicate(); + + assertTrue(writer.equals(other)); + + writer.setSyncOnFlush(false); + assertFalse(writer.equals(other)); + assertFalse(writer.equals(new StringWriter<>())); + } +}