Skip to content

Commit

Permalink
[FLINK-24564][formats] Change the default compression to snappy for p…
Browse files Browse the repository at this point in the history
…arquet, avro in table

This closes apache#17500
  • Loading branch information
SteNicholas committed Oct 19, 2021
1 parent 639824f commit f75cf38
Show file tree
Hide file tree
Showing 7 changed files with 16 additions and 8 deletions.
2 changes: 1 addition & 1 deletion docs/content.zh/docs/connectors/table/formats/avro.md
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ Format 参数
<td>可选</td>
<td style="word-wrap: break-word;">(none)</td>
<td>String</td>
<td>仅用于 <a href="{{< ref "docs/connectors/table/filesystem" >}}">filesystem</a>,avro 压缩编解码器。默认不压缩。目前支持:deflate、snappy、bzip2、xz。</td>
<td>仅用于 <a href="{{< ref "docs/connectors/table/filesystem" >}}">filesystem</a>,avro 压缩编解码器。默认 snappy 压缩。目前支持:null, deflate、snappy、bzip2、xz。</td>
</tr>
</tbody>
</table>
Expand Down
2 changes: 1 addition & 1 deletion docs/content/docs/connectors/table/formats/avro.md
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ Format Options
<td>optional</td>
<td style="word-wrap: break-word;">(none)</td>
<td>String</td>
<td>For <a href="{{< ref "docs/connectors/table/filesystem" >}}">Filesystem</a> only, the compression codec for avro. No compression as default. The valid enumerations are: deflate, snappy, bzip2, xz.</td>
<td>For <a href="{{< ref "docs/connectors/table/filesystem" >}}">Filesystem</a> only, the compression codec for avro. Snappy compression as default. The valid enumerations are: null, deflate, snappy, bzip2, xz.</td>
</tr>
</tbody>
</table>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,14 +22,16 @@
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.ConfigOptions;

import static org.apache.avro.file.DataFileConstants.SNAPPY_CODEC;

/** Options for the avro format. */
@PublicEvolving
public class AvroFormatOptions {

public static final ConfigOption<String> AVRO_OUTPUT_CODEC =
ConfigOptions.key("codec")
.stringType()
.noDefaultValue()
.defaultValue(SNAPPY_CODEC)
.withDescription("The compression codec for avro");

private AvroFormatOptions() {}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@
import java.util.Collection;
import java.util.List;

import static org.apache.avro.file.DataFileConstants.NULL_CODEC;

/** ITCase to test avro format for {@link AvroFileSystemFormatFactory} in batch mode. */
@RunWith(Parameterized.class)
public class AvroFilesystemITCase extends BatchFileSystemITCaseBase {
Expand All @@ -48,7 +50,7 @@ public String[] formatProperties() {
List<String> ret = new ArrayList<>();
ret.add("'format'='avro'");
if (configure) {
ret.add("'avro.codec'='snappy'");
ret.add(String.format("'avro.codec'='%s'", NULL_CODEC));
}
return ret.toArray(new String[0]);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ public class AvroFilesystemStreamITCase extends FsStreamingSinkITCaseBase {
public String[] additionalProperties() {
List<String> ret = new ArrayList<>();
ret.add("'format'='avro'");
ret.add("'avro.codec'='snappy'");
return ret.toArray(new String[0]);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,10 @@
import org.apache.flink.table.types.logical.RowType;

import org.apache.hadoop.conf.Configuration;
import org.apache.parquet.hadoop.ParquetOutputFormat;
import org.apache.parquet.hadoop.ParquetWriter;
import org.apache.parquet.hadoop.api.WriteSupport;
import org.apache.parquet.hadoop.metadata.CompressionCodecName;
import org.apache.parquet.io.OutputFile;
import org.apache.parquet.io.api.RecordConsumer;
import org.apache.parquet.schema.MessageType;
Expand All @@ -43,7 +45,6 @@
import static org.apache.parquet.hadoop.ParquetOutputFormat.getPageSize;
import static org.apache.parquet.hadoop.ParquetOutputFormat.getValidation;
import static org.apache.parquet.hadoop.ParquetOutputFormat.getWriterVersion;
import static org.apache.parquet.hadoop.codec.CodecConfig.getParquetCompressionCodec;

/** {@link RowData} of {@link ParquetWriter.Builder}. */
public class ParquetRowDataBuilder extends ParquetWriter.Builder<RowData, ParquetRowDataBuilder> {
Expand Down Expand Up @@ -122,7 +123,11 @@ public FlinkParquetBuilder(RowType rowType, Configuration conf, boolean utcTimes
public ParquetWriter<RowData> createWriter(OutputFile out) throws IOException {
Configuration conf = configuration.conf();
return new ParquetRowDataBuilder(out, rowType, utcTimestamp)
.withCompressionCodec(getParquetCompressionCodec(conf))
.withCompressionCodec(
CompressionCodecName.fromConf(
conf.get(
ParquetOutputFormat.COMPRESSION,
CompressionCodecName.SNAPPY.name())))
.withRowGroupSize(getBlockSize(conf))
.withPageSize(getPageSize(conf))
.withDictionaryPageSize(getDictionaryPageSize(conf))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ public void testNonPartition() {
footer.getBlocks().get(0).getColumns().get(0).getCodec().toString());
} else {
Assert.assertEquals(
"UNCOMPRESSED",
"SNAPPY",
footer.getBlocks().get(0).getColumns().get(0).getCodec().toString());
}
} catch (IOException e) {
Expand Down

0 comments on commit f75cf38

Please sign in to comment.