Skip to content

Commit

Permalink
[FLINK-11427][formats] Add protobuf parquet support for StreamingFile…
Browse files Browse the repository at this point in the history
…Sink

This reverts commit a9436d6.
  • Loading branch information
Gao Yun authored and aljoscha committed Oct 12, 2020
1 parent fdea3cd commit b358926
Show file tree
Hide file tree
Showing 7 changed files with 362 additions and 0 deletions.
37 changes: 37 additions & 0 deletions docs/dev/connectors/streamfile_sink.md
Original file line number Diff line number Diff line change
Expand Up @@ -205,6 +205,43 @@ input.addSink(sink)
</div>
</div>

Similarly, a StreamingFileSink that writes Protobuf data to Parquet format can be created like this:

<div class="codetabs" markdown="1">
<div data-lang="java" markdown="1">
{% highlight java %}
import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink;
import org.apache.flink.formats.parquet.protobuf.ParquetProtoWriters;

// ProtoRecord is a generated protobuf Message class.
DataStream<ProtoRecord> stream = ...;

final StreamingFileSink<ProtoRecord> sink = StreamingFileSink
.forBulkFormat(outputBasePath, ParquetProtoWriters.forType(ProtoRecord.class))
.build();

input.addSink(sink);

{% endhighlight %}
</div>
<div data-lang="scala" markdown="1">
{% highlight scala %}
import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink
import org.apache.flink.formats.parquet.protobuf.ParquetProtoWriters

// ProtoRecord is a generated protobuf Message class.
val input: DataStream[ProtoRecord] = ...

val sink: StreamingFileSink[ProtoRecord] = StreamingFileSink
.forBulkFormat(outputBasePath, ParquetProtoWriters.forType(classOf[ProtoRecord]))
.build()

input.addSink(sink)

{% endhighlight %}
</div>
</div>

#### Avro format

Flink also provides built-in support for writing data into Avro files. A list of convenience methods to create
Expand Down
37 changes: 37 additions & 0 deletions docs/dev/connectors/streamfile_sink.zh.md
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,43 @@ input.addSink(sink)
</div>
</div>

类似的,将 Protobuf 数据写入到 Parquet 格式可以通过:

<div class="codetabs" markdown="1">
<div data-lang="java" markdown="1">
{% highlight java %}
import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink;
import org.apache.flink.formats.parquet.protobuf.ParquetProtoWriters;

// ProtoRecord is a generated protobuf Message class.
DataStream<ProtoRecord> stream = ...;

final StreamingFileSink<ProtoRecord> sink = StreamingFileSink
.forBulkFormat(outputBasePath, ParquetProtoWriters.forType(ProtoRecord.class))
.build();

input.addSink(sink);

{% endhighlight %}
</div>
<div data-lang="scala" markdown="1">
{% highlight scala %}
import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink
import org.apache.flink.formats.parquet.protobuf.ParquetProtoWriters

// ProtoRecord is a generated protobuf Message class.
val input: DataStream[ProtoRecord] = ...

val sink: StreamingFileSink[ProtoRecord] = StreamingFileSink
.forBulkFormat(outputBasePath, ParquetProtoWriters.forType(classOf[ProtoRecord]))
.build()

input.addSink(sink)

{% endhighlight %}
</div>
</div>

#### Avro格式

Flink 也提供了将数据写入 Avro 文件的内置支持。对于创建 AvroWriterFactory 的快捷方法,更多信息可以参考
Expand Down
81 changes: 81 additions & 0 deletions flink-formats/flink-parquet/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -96,12 +96,24 @@ under the License.
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<scope>provided</scope>
<exclusions>
<exclusion>
<groupId>com.google.protobuf</groupId>
<artifactId>protobuf-java</artifactId>
</exclusion>
</exclusions>
</dependency>

<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-mapreduce-client-core</artifactId>
<scope>provided</scope>
<exclusions>
<exclusion>
<groupId>com.google.protobuf</groupId>
<artifactId>protobuf-java</artifactId>
</exclusion>
</exclusions>
</dependency>

<!-- For now, fastutil is provided already by flink-runtime -->
Expand Down Expand Up @@ -131,6 +143,19 @@ under the License.
</exclusions>
</dependency>

<dependency>
<groupId>org.apache.parquet</groupId>
<artifactId>parquet-protobuf</artifactId>
<version>${flink.format.parquet.version}</version>
<optional>true</optional>
<exclusions>
<exclusion>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
</exclusion>
</exclusions>
</dependency>

<!-- test dependencies -->

<dependency>
Expand Down Expand Up @@ -172,6 +197,14 @@ under the License.
</dependencies>

<build>
<extensions>
<extension>
<groupId>kr.motd.maven</groupId>
<artifactId>os-maven-plugin</artifactId>
<version>1.5.0.Final</version>
</extension>
</extensions>

<plugins>
<!-- Generate Test class from avro schema -->
<plugin>
Expand All @@ -192,6 +225,54 @@ under the License.
</executions>
</plugin>

<!-- Geneate Test class from protobuf schema -->
<plugin>
<groupId>org.xolstice.maven.plugins</groupId>
<artifactId>protobuf-maven-plugin</artifactId>
<version>0.5.1</version>
<extensions>true</extensions>
<configuration>
<!-- Currently Flink azure test pipeline would first pre-compile and then upload the compiled
directory, then it download the directory and run the corresponding tests. However, the protoc
executable under the target directory would lost the execution permission bit after downloading.
To solve this issue we would skip generating the target files if they already exist after
downloading. -->
<checkStaleness>true</checkStaleness>
<protoTestSourceRoot>${project.basedir}/src/test/resources/protobuf</protoTestSourceRoot>
<!-- Generates classes into a separate directory since the generator always removes existing files. -->
<outputDirectory>${project.build.directory}/generated-test-sources/protobuf/java</outputDirectory>
<protocArtifact>com.google.protobuf:protoc:3.5.1:exe:${os.detected.classifier}</protocArtifact>
</configuration>
<executions>
<execution>
<phase>generate-sources</phase>
<goals>
<goal>test-compile</goal>
</goals>
</execution>
</executions>
</plugin>

<!-- Adding protobuf generated classes to test build path -->
<plugin>
<groupId>org.codehaus.mojo</groupId>
<artifactId>build-helper-maven-plugin</artifactId>
<executions>
<execution>
<id>add-test-source</id>
<phase>generate-sources</phase>
<goals>
<goal>add-test-source</goal>
</goals>
<configuration>
<sources>
<source>${project.build.directory}/generated-test-sources/protobuf/java</source>
</sources>
</configuration>
</execution>
</executions>
</plugin>

<!-- skip dependency convergence due to Hadoop dependency -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.formats.parquet.protobuf;

import org.apache.flink.formats.parquet.ParquetBuilder;
import org.apache.flink.formats.parquet.ParquetWriterFactory;

import com.google.protobuf.Message;
import org.apache.hadoop.conf.Configuration;
import org.apache.parquet.hadoop.ParquetWriter;
import org.apache.parquet.hadoop.api.WriteSupport;
import org.apache.parquet.io.OutputFile;
import org.apache.parquet.proto.ProtoWriteSupport;

/**
* Convenience builder for creating {@link ParquetWriterFactory} instances for Protobuf classes.
*/
public class ParquetProtoWriters {

/**
* Creates a {@link ParquetWriterFactory} for the given type. The type should represent a Protobuf message.
*
* @param type The class of the type to write.
*/
public static <T extends Message> ParquetWriterFactory<T> forType(Class<T> type) {
ParquetBuilder<T> builder = (out) -> new ParquetProtoWriterBuilder<>(out, type).build();
return new ParquetWriterFactory<>(builder);
}

// ------------------------------------------------------------------------

/**
* The builder for Protobuf {@link ParquetWriter}.
*/
private static class ParquetProtoWriterBuilder<T extends Message>
extends ParquetWriter.Builder<T, ParquetProtoWriterBuilder<T>> {

private final Class<T> clazz;

protected ParquetProtoWriterBuilder(OutputFile outputFile, Class<T> clazz) {
super(outputFile);
this.clazz = clazz;
}

@Override
protected ParquetProtoWriterBuilder<T> self() {
return this;
}

@Override
protected WriteSupport<T> getWriteSupport(Configuration conf) {
return new ProtoWriteSupport<>(clazz);
}
}

/** Class is not meant to be instantiated. */
private ParquetProtoWriters() {}
}
Loading

0 comments on commit b358926

Please sign in to comment.