Skip to content

Commit

Permalink
[FLINK-24687][table][connectors] Move FileSystemTableSink, FileSystem…
Browse files Browse the repository at this point in the history
…TableSource to flink-connector-files and columnar support to flink-table-common

Now table packages don't depend on flink-connector-files anymore. Fix orc and parquet format to use only common classes and not planner nor runtime classes.

- [connector-files] Add @internal to all public classes and interfaces
- [orc][parquet][hive] Drop scala suffix from flink-orc and flink-parquet
- [architecture-tests] Updated the violations file
- [connector-elasticsearch-base] Add flink-connector-base as dependency, which was previously brought in through flink-table-api-java-bridge -> flink-table-api-java -> flink-table-common -> flink-connector-files -> flink-connector-base.
- [orc][parquet] Add issue link for partition keys handling
- [table-uber][dist] Now flink-connector-files is not shaded inside table-uber anymore but it's loaded in /lib in the distribution as flink-connector-files
- [docs] Update sql_connectors.yml

This closes apache#17897.
  • Loading branch information
slinkydeveloper authored and twalthr committed Dec 3, 2021
1 parent 6bb0907 commit 9bbadb9
Show file tree
Hide file tree
Showing 262 changed files with 1,041 additions and 690 deletions.
1 change: 1 addition & 0 deletions docs/content/docs/connectors/table/filesystem.md
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ This connector provides access to partitioned files in filesystems
supported by the [Flink FileSystem abstraction]({{< ref "docs/deployment/filesystems/overview" >}}).

The file system connector itself is included in Flink and does not require an additional dependency.
The corresponding jar can be found in the Flink distribution inside the `/lib` directory.
A corresponding format needs to be specified for reading and writing rows from and to a file system.

The file system connector allows for reading and writing from a local or distributed filesystem. A filesystem table can be defined as:
Expand Down
46 changes: 26 additions & 20 deletions docs/data/sql_connectors.yml
Original file line number Diff line number Diff line change
Expand Up @@ -50,15 +50,15 @@ avro-confluent:

orc:
name: ORC
maven: flink-orc$scala_version
maven: flink-orc
category: format
sql_url: https://repo.maven.apache.org/maven2/org/apache/flink/flink-sql-orc$scala_version/$version/flink-sql-orc$scala_version-$version.jar
sql_url: https://repo.maven.apache.org/maven2/org/apache/flink/flink-sql-orc/$version/flink-sql-orc-$version.jar

parquet:
name: Parquet
maven: flink-parquet$scala_version
maven: flink-parquet
category: format
sql_url: https://repo.maven.apache.org/maven2/org/apache/flink/flink-sql-parquet$scala_version/$version/flink-sql-parquet$scala_version-$version.jar
sql_url: https://repo.maven.apache.org/maven2/org/apache/flink/flink-sql-parquet/$version/flink-sql-parquet-$version.jar

debezium-avro-confluent:
name: Debezium
Expand Down Expand Up @@ -102,53 +102,59 @@ raw:
category: format
builtin: true

files:
name: Files
category: connector
maven: flink-connector-files
sql_url: https://repo.maven.apache.org/maven2/org/apache/flink/flink-connector-files/$version/flink-connector-files-$version.jar

elastic:
name: Elasticsearch
category: connector
versions:
- version: 6.x
maven: flink-connector-elasticsearch6$scala_version
sql_url: https://repo.maven.apache.org/maven2/org/apache/flink/flink-sql-connector-elasticsearch6$scala_version/$version/flink-sql-connector-elasticsearch6$scala_version-$version.jar
maven: flink-connector-elasticsearch6
sql_url: https://repo.maven.apache.org/maven2/org/apache/flink/flink-sql-connector-elasticsearch6/$version/flink-sql-connector-elasticsearch6-$version.jar
- version: 7.x and later versions
maven: flink-connector-elasticsearch7$scala_version
sql_url: https://repo.maven.apache.org/maven2/org/apache/flink/flink-sql-connector-elasticsearch7$scala_version/$version/flink-sql-connector-elasticsearch7$scala_version-$version.jar
maven: flink-connector-elasticsearch7
sql_url: https://repo.maven.apache.org/maven2/org/apache/flink/flink-sql-connector-elasticsearch7/$version/flink-sql-connector-elasticsearch7-$version.jar

hbase:
name: HBase
category: connector
versions:
- version: 1.4.x
maven: flink-connector-hbase-1.4$scala_version
sql_url: https://repo.maven.apache.org/maven2/org/apache/flink/flink-sql-connector-hbase-1.4$scala_version/$version/flink-sql-connector-hbase-1.4$scala_version-$version.jar
maven: flink-connector-hbase-1.4
sql_url: https://repo.maven.apache.org/maven2/org/apache/flink/flink-sql-connector-hbase-1.4/$version/flink-sql-connector-hbase-1.4-$version.jar
- version: 2.2.x
maven: flink-connector-hbase-2.2$scala_version
sql_url: https://repo.maven.apache.org/maven2/org/apache/flink/flink-sql-connector-hbase-2.2$scala_version/$version/flink-sql-connector-hbase-2.2$scala_version-$version.jar
maven: flink-connector-hbase-2.2
sql_url: https://repo.maven.apache.org/maven2/org/apache/flink/flink-sql-connector-hbase-2.2/$version/flink-sql-connector-hbase-2.2-$version.jar

jdbc:
name: JDBC
category: connector
maven: flink-connector-jdbc$scala_version
sql_url: https://repo.maven.apache.org/maven2/org/apache/flink/flink-connector-jdbc$scala_version/$version/flink-connector-jdbc$scala_version-$version.jar
maven: flink-connector-jdbc
sql_url: https://repo.maven.apache.org/maven2/org/apache/flink/flink-connector-jdbc/$version/flink-connector-jdbc-$version.jar

kafka:
name: Kafka
category: connector
versions:
- version: universal
maven: flink-connector-kafka$scala_version
sql_url: https://repo.maven.apache.org/maven2/org/apache/flink/flink-sql-connector-kafka$scala_version/$version/flink-sql-connector-kafka$scala_version-$version.jar
maven: flink-connector-kafka
sql_url: https://repo.maven.apache.org/maven2/org/apache/flink/flink-sql-connector-kafka/$version/flink-sql-connector-kafka-$version.jar

upsert-kafka:
name: Upsert Kafka
category: connector
versions:
- version: universal
maven: flink-connector-kafka$scala_version
sql_url: https://repo.maven.apache.org/maven2/org/apache/flink/flink-sql-connector-kafka$scala_version/$version/flink-sql-connector-kafka$scala_version-$version.jar
maven: flink-connector-kafka
sql_url: https://repo.maven.apache.org/maven2/org/apache/flink/flink-sql-connector-kafka/$version/flink-sql-connector-kafka-$version.jar

kinesis:
name: Kinesis
category: connector
maven: flink-connector-kinesis$scala_version
sql_url: https://repo.maven.apache.org/maven2/org/apache/flink/flink-sql-connector-kinesis$scala_version/$version/flink-sql-connector-kinesis$scala_version-$version.jar
maven: flink-connector-kinesis
sql_url: https://repo.maven.apache.org/maven2/org/apache/flink/flink-sql-connector-kinesis/$version/flink-sql-connector-kinesis-$version.jar

Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
Class <org.apache.flink.connector.jdbc.internal.options.JdbcConnectorOptions> does not reside in a package 'org.apache.flink..table' in (JdbcConnectorOptions.java:0)
Class <org.apache.flink.formats.raw.RawFormatOptions> does not reside in a package 'org.apache.flink..table' in (RawFormatOptions.java:0)
Class <org.apache.flink.table.filesystem.FileSystemConnectorOptions> does not reside in a package 'org.apache.flink..table' in (FileSystemConnectorOptions.java:0)
org.apache.flink.connector.jdbc.internal.options.JdbcConnectorOptions does not satisfy: annotated with @PublicEvolving or annotated with @Public
5 changes: 5 additions & 0 deletions flink-connectors/flink-connector-elasticsearch-base/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,11 @@ under the License.

<!-- Core -->

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-base</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java</artifactId>
Expand Down
38 changes: 37 additions & 1 deletion flink-connectors/flink-connector-files/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,21 @@ under the License.
<version>${project.version}</version>
</dependency>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java</artifactId>
<version>${project.version}</version>
<scope>provided</scope>
</dependency>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-java-bridge</artifactId>
<version>${project.version}</version>
<scope>provided</scope>
<optional>true</optional>
</dependency>

<!-- test dependencies -->

<dependency>
Expand Down Expand Up @@ -86,12 +101,13 @@ under the License.
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java</artifactId>
<version>${project.version}</version>
<type>test-jar</type>
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java</artifactId>
<artifactId>flink-table-common</artifactId>
<version>${project.version}</version>
<type>test-jar</type>
<scope>test</scope>
Expand All @@ -112,6 +128,26 @@ under the License.
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<executions>
<execution>
<id>shade-flink</id>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<artifactSet>
<includes>
<include>org.apache.flink:flink-connector-base</include>
</includes>
</artifactSet>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
* limitations under the License.
*/

package org.apache.flink.table.filesystem;
package org.apache.flink.connector.file.table;

import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.DelegatingConfiguration;
Expand All @@ -31,9 +31,6 @@
import java.util.List;
import java.util.stream.Collectors;

import static org.apache.flink.table.filesystem.FileSystemConnectorOptions.PARTITION_DEFAULT_NAME;
import static org.apache.flink.table.filesystem.FileSystemConnectorOptions.PATH;

/** Abstract File system table for providing some common methods. */
abstract class AbstractFileSystemTable {

Expand All @@ -52,8 +49,8 @@ abstract class AbstractFileSystemTable {
this.tableOptions = new Configuration();
context.getCatalogTable().getOptions().forEach(tableOptions::setString);
this.schema = context.getCatalogTable().getResolvedSchema();
this.path = new Path(tableOptions.get(PATH));
this.defaultPartName = tableOptions.get(PARTITION_DEFAULT_NAME);
this.path = new Path(tableOptions.get(FileSystemConnectorOptions.PATH));
this.defaultPartName = tableOptions.get(FileSystemConnectorOptions.PARTITION_DEFAULT_NAME);

this.partitionKeys = context.getCatalogTable().getPartitionKeys();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,9 @@
* limitations under the License.
*/

package org.apache.flink.table.runtime.util;
package org.apache.flink.connector.file.table;

import org.apache.flink.annotation.Internal;

import java.util.ArrayList;
import java.util.Deque;
Expand All @@ -25,6 +27,7 @@
import java.util.function.Function;

/** A bin packing implementation. */
@Internal
public class BinPacking {
private BinPacking() {}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,22 +16,24 @@
* limitations under the License.
*/

package org.apache.flink.table.filesystem;
package org.apache.flink.connector.file.table;

import org.apache.flink.annotation.Internal;
import org.apache.flink.connector.file.src.reader.BulkFormat;
import org.apache.flink.connector.file.src.util.CheckpointedPosition;
import org.apache.flink.connector.file.src.util.MutableRecordAndPosition;
import org.apache.flink.connector.file.src.util.RecordAndPosition;
import org.apache.flink.connector.file.src.util.RecyclableIterator;
import org.apache.flink.table.data.ColumnarRowData;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.columnar.ColumnarRowData;

import javax.annotation.Nullable;

/**
* A {@link BulkFormat.RecordIterator} that returns {@link RowData}s. The next row is set by {@link
* ColumnarRowData#setRowId}.
*/
@Internal
public class ColumnarRowIterator extends RecyclableIterator<RowData> {

private final ColumnarRowData rowData;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
* limitations under the License.
*/

package org.apache.flink.table.filesystem;
package org.apache.flink.connector.file.table;

import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.typeutils.TypeSerializer;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,9 @@
* limitations under the License.
*/

package org.apache.flink.table.filesystem;
package org.apache.flink.connector.file.table;

import org.apache.flink.annotation.Internal;
import org.apache.flink.table.data.TimestampData;

import javax.annotation.Nullable;
Expand All @@ -44,6 +45,7 @@
* Default {@link PartitionTimeExtractor}. See {@link
* FileSystemConnectorOptions#PARTITION_TIME_EXTRACTOR_TIMESTAMP_PATTERN}.
*/
@Internal
public class DefaultPartTimeExtractor implements PartitionTimeExtractor {

private static final DateTimeFormatter TIMESTAMP_FORMATTER =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,9 @@
* limitations under the License.
*/

package org.apache.flink.table.filesystem;
package org.apache.flink.connector.file.table;

import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.io.DelimitedInputFormat;
import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.api.common.typeinfo.TypeInformation;
Expand All @@ -41,9 +42,10 @@
import java.util.Queue;

import static org.apache.flink.connector.file.src.util.CheckpointedPosition.NO_OFFSET;
import static org.apache.flink.table.data.vector.VectorizedColumnBatch.DEFAULT_SIZE;
import static org.apache.flink.table.data.columnar.vector.VectorizedColumnBatch.DEFAULT_SIZE;

/** Adapter to turn a {@link DeserializationSchema} into a {@link BulkFormat}. */
@Internal
public class DeserializationSchemaAdapter implements BulkFormat<RowData, FileSourceSplit> {

private static final int BATCH_SIZE = 100;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
* limitations under the License.
*/

package org.apache.flink.table.filesystem;
package org.apache.flink.connector.file.table;

import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.io.OutputFormat;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
* limitations under the License.
*/

package org.apache.flink.table.filesystem;
package org.apache.flink.connector.file.table;

import org.apache.flink.annotation.Internal;
import org.apache.flink.core.fs.Path;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
* limitations under the License.
*/

package org.apache.flink.table.filesystem;
package org.apache.flink.connector.file.table;

import org.apache.flink.annotation.Internal;
import org.apache.flink.table.data.ArrayData;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
* limitations under the License.
*/

package org.apache.flink.table.filesystem;
package org.apache.flink.connector.file.table;

import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.configuration.Configuration;
Expand All @@ -26,7 +26,6 @@
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.utils.PartitionPathUtils;

Expand Down Expand Up @@ -57,11 +56,12 @@ class FileInfoExtractorBulkFormat implements BulkFormat<RowData, FileSourceSplit
public FileInfoExtractorBulkFormat(
BulkFormat<RowData, FileSourceSplit> wrapped,
DataType producedDataType,
TypeInformation<RowData> producedTypeInformation,
Map<String, FileSystemTableSource.FileInfoAccessor> metadataColumns,
List<String> partitionColumns,
String defaultPartName) {
this.wrapped = wrapped;
this.producedType = InternalTypeInfo.of(producedDataType.getLogicalType());
this.producedType = producedTypeInformation;
this.defaultPartName = defaultPartName;

// Compute index mapping for the extended row and the functions to compute metadata
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
* limitations under the License.
*/

package org.apache.flink.table.filesystem;
package org.apache.flink.connector.file.table;

import org.apache.flink.annotation.Internal;
import org.apache.flink.core.fs.FileSystem;
Expand All @@ -27,8 +27,8 @@
import java.util.List;
import java.util.Map;

import static org.apache.flink.table.filesystem.PartitionTempFileManager.collectPartSpecToPaths;
import static org.apache.flink.table.filesystem.PartitionTempFileManager.listTaskTemporaryPaths;
import static org.apache.flink.connector.file.table.PartitionTempFileManager.collectPartSpecToPaths;
import static org.apache.flink.connector.file.table.PartitionTempFileManager.listTaskTemporaryPaths;

/**
* File system file committer implementation. It moves all files to output path from temporary path.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
* limitations under the License.
*/

package org.apache.flink.table.filesystem;
package org.apache.flink.connector.file.table;

import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.configuration.ConfigOption;
Expand Down
Loading

0 comments on commit 9bbadb9

Please sign in to comment.