Skip to content

Commit

Permalink
[FLINK-2170] [connectors] Add OrcRowInputFormat and OrcTableSource.
Browse files Browse the repository at this point in the history
This closes apache#5043.
  • Loading branch information
fhueske committed Nov 22, 2017
1 parent edbf8c9 commit 200612e
Show file tree
Hide file tree
Showing 25 changed files with 3,306 additions and 18,512 deletions.
49 changes: 49 additions & 0 deletions docs/dev/table/sourceSinks.md
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ A custom `TableSource` can be defined by implementing the `BatchTableSource` or
| `Kafka08AvroTableSource` | `flink-connector-kafka-0.8` | N | Y | A `TableSource` for Avro-encoded Kafka 0.8 topics.
| `Kafka08JsonTableSource` | `flink-connector-kafka-0.8` | N | Y | A `TableSource` for flat Json-encoded Kafka 0.8 topics.
| `CsvTableSource` | `flink-table` | Y | Y | A simple `TableSource` for CSV files.
| `OrcTableSource` | `flink-orc` | Y | N | A `TableSource` for ORC files.

All sources that come with the `flink-table` dependency are directly available for Table API or SQL programs. For all other table sources, you have to add the respective dependency in addition to the `flink-table` dependency.

Expand Down Expand Up @@ -485,6 +486,54 @@ val csvTableSource = CsvTableSource

{% top %}

### OrcTableSource

The `OrcTableSource` reads [ORC files](https://orc.apache.org). ORC is a file format for structured data and stores the data in a compressed, columnar representation. ORC is very storage efficient and supports projection and filter push-down.

An `OrcTableSource` is created as shown below:

<div class="codetabs" markdown="1">
<div data-lang="java" markdown="1">
{% highlight java %}

// create Hadoop Configuration
Configuration config = new Configuration();

OrcTableSource orcTableSource = OrcTableSource.builder()
// path to ORC file(s)
.path("file:https:///path/to/data")
// schema of ORC files
.forOrcSchema("struct<name:string,addresses:array<struct<street:string,zip:smallint>>>")
// Hadoop configuration
.withConfiguration(config)
// build OrcTableSource
.build();
{% endhighlight %}
</div>

<div data-lang="scala" markdown="1">
{% highlight scala %}

// create Hadoop Configuration
val config = new Configuration()

val orcTableSource = OrcTableSource.builder()
// path to ORC file(s)
.path("file:https:///path/to/data")
// schema of ORC files
.forOrcSchema("struct<name:string,addresses:array<struct<street:string,zip:smallint>>>")
// Hadoop configuration
.withConfiguration(config)
// build OrcTableSource
.build()
{% endhighlight %}
</div>
</div>

**Note:** The `OrcTableSource` does not support ORC's `Union` type yet.

{% top %}

Provided TableSinks
-------------------

Expand Down
89 changes: 24 additions & 65 deletions flink-connectors/flink-orc/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ under the License.
<parent>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connectors</artifactId>
<version>1.4-SNAPSHOT</version>
<version>1.5-SNAPSHOT</version>
<relativePath>..</relativePath>
</parent>

Expand All @@ -40,22 +40,39 @@ under the License.

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table_${scala.binary.version}</artifactId>
<artifactId>flink-streaming-scala_${scala.binary.version}</artifactId>
<version>${project.version}</version>
<scope>compile</scope>
<scope>provided</scope>
</dependency>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-scala_${scala.binary.version}</artifactId>
<artifactId>flink-table_${scala.binary.version}</artifactId>
<version>${project.version}</version>
<scope>compile</scope>
<scope>provided</scope>
<!-- Projects depending on this project, won't depend on flink-table. -->
<optional>true</optional>
</dependency>

<dependency>
<groupId>org.apache.orc</groupId>
<artifactId>orc-core</artifactId>
<version>1.4.0</version>
<version>1.4.1</version>
<exclusions>
<!-- Exclude ORC's Hadoop dependency and pull in Flink's shaded Hadoop. -->
<exclusion>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
</exclusion>
</exclusions>
</dependency>

<!-- Replacement for ORC's Hadoop dependency. -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-shaded-hadoop2</artifactId>
<version>${project.version}</version>
<scope>provided</scope>
</dependency>

<!-- test dependencies -->
Expand Down Expand Up @@ -88,65 +105,7 @@ under the License.
<scope>test</scope>
<type>test-jar</type>
</dependency>
</dependencies>

<build>

<pluginManagement>
<plugins>
<!--This plugin's configuration is used to store Eclipse m2e settings only. It has no influence on the Maven build itself.-->
<plugin>
<groupId>org.eclipse.m2e</groupId>
<artifactId>lifecycle-mapping</artifactId>
<version>1.0.0</version>
<configuration>
<lifecycleMappingMetadata>
<pluginExecutions>
<pluginExecution>
<pluginExecutionFilter>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-assembly-plugin</artifactId>
<versionRange>[2.4,)</versionRange>
<goals>
<goal>single</goal>
</goals>
</pluginExecutionFilter>
<action>
<ignore/>
</action>
</pluginExecution>
<pluginExecution>
<pluginExecutionFilter>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-clean-plugin</artifactId>
<versionRange>[1,)</versionRange>
<goals>
<goal>clean</goal>
</goals>
</pluginExecutionFilter>
<action>
<ignore/>
</action>
</pluginExecution>
<pluginExecution>
<pluginExecutionFilter>
<groupId>org.apache.avro</groupId>
<artifactId>avro-maven-plugin</artifactId>
<versionRange>[1.7.7,)</versionRange>
<goals>
<goal>schema</goal>
</goals>
</pluginExecutionFilter>
<action>
<ignore/>
</action>
</pluginExecution>
</pluginExecutions>
</lifecycleMappingMetadata>
</configuration>
</plugin>
</plugins>
</pluginManagement>
</build>
</dependencies>

</project>
Loading

0 comments on commit 200612e

Please sign in to comment.